Skip to content

Commit cb6bd83

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types
SPARK-4407 was detected while working on SPARK-4309. Merged these two into a single PR since 1.2.0 RC is approaching. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3178) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes apache#3178 from liancheng/date-for-thriftserver and squashes the following commits: 6f71d0b [Cheng Lian] Makes toHiveString static 26fa955 [Cheng Lian] Fixes complex type support in Hive 0.13.1 shim a92882a [Cheng Lian] Updates HiveShim for 0.13.1 73f442b [Cheng Lian] Adds Date support for HiveThriftServer2 (Hive 0.12.0)
1 parent 7850e0c commit cb6bd83

File tree

4 files changed

+142
-115
lines changed

4 files changed

+142
-115
lines changed

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala

Lines changed: 64 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.thriftserver
1919

2020
import java.io.File
2121
import java.net.ServerSocket
22-
import java.sql.{DriverManager, Statement}
22+
import java.sql.{Date, DriverManager, Statement}
2323
import java.util.concurrent.TimeoutException
2424

25+
import scala.collection.JavaConversions._
2526
import scala.collection.mutable.ArrayBuffer
2627
import scala.concurrent.duration._
2728
import scala.concurrent.{Await, Promise}
@@ -51,6 +52,15 @@ import org.apache.spark.sql.hive.HiveShim
5152
class HiveThriftServer2Suite extends FunSuite with Logging {
5253
Class.forName(classOf[HiveDriver].getCanonicalName)
5354

55+
object TestData {
56+
def getTestDataFilePath(name: String) = {
57+
Thread.currentThread().getContextClassLoader.getResource(s"data/files/$name")
58+
}
59+
60+
val smallKv = getTestDataFilePath("small_kv.txt")
61+
val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt")
62+
}
63+
5464
def randomListeningPort = {
5565
// Let the system to choose a random available port to avoid collision with other parallel
5666
// builds.
@@ -145,12 +155,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
145155
}
146156
}
147157

148-
val env = Seq(
149-
// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
150-
"SPARK_TESTING" -> "0",
151-
// Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read
152-
// proper version information from the jar manifest.
153-
"SPARK_PREPEND_CLASSES" -> "")
158+
// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
159+
val env = Seq("SPARK_TESTING" -> "0")
154160

155161
Process(command, None, env: _*).run(ProcessLogger(
156162
captureThriftServerOutput("stdout"),
@@ -194,15 +200,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
194200

195201
test("Test JDBC query execution") {
196202
withJdbcStatement() { statement =>
197-
val dataFilePath =
198-
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
199-
200-
val queries =
201-
s"""SET spark.sql.shuffle.partitions=3;
202-
|CREATE TABLE test(key INT, val STRING);
203-
|LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
204-
|CACHE TABLE test;
205-
""".stripMargin.split(";").map(_.trim).filter(_.nonEmpty)
203+
val queries = Seq(
204+
"SET spark.sql.shuffle.partitions=3",
205+
"DROP TABLE IF EXISTS test",
206+
"CREATE TABLE test(key INT, val STRING)",
207+
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
208+
"CACHE TABLE test")
206209

207210
queries.foreach(statement.execute)
208211

@@ -216,14 +219,10 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
216219

217220
test("SPARK-3004 regression: result set containing NULL") {
218221
withJdbcStatement() { statement =>
219-
val dataFilePath =
220-
Thread.currentThread().getContextClassLoader.getResource(
221-
"data/files/small_kv_with_null.txt")
222-
223222
val queries = Seq(
224223
"DROP TABLE IF EXISTS test_null",
225224
"CREATE TABLE test_null(key INT, val STRING)",
226-
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_null")
225+
s"LOAD DATA LOCAL INPATH '${TestData.smallKvWithNull}' OVERWRITE INTO TABLE test_null")
227226

228227
queries.foreach(statement.execute)
229228

@@ -270,24 +269,63 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
270269

271270
test("SPARK-4292 regression: result set iterator issue") {
272271
withJdbcStatement() { statement =>
273-
val dataFilePath =
274-
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
275-
276272
val queries = Seq(
277273
"DROP TABLE IF EXISTS test_4292",
278274
"CREATE TABLE test_4292(key INT, val STRING)",
279-
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_4292")
275+
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_4292")
280276

281277
queries.foreach(statement.execute)
282278

283279
val resultSet = statement.executeQuery("SELECT key FROM test_4292")
284280

285281
Seq(238, 86, 311, 27, 165).foreach { key =>
286282
resultSet.next()
287-
assert(resultSet.getInt(1) == key)
283+
assert(resultSet.getInt(1) === key)
288284
}
289285

290286
statement.executeQuery("DROP TABLE IF EXISTS test_4292")
291287
}
292288
}
289+
290+
test("SPARK-4309 regression: Date type support") {
291+
withJdbcStatement() { statement =>
292+
val queries = Seq(
293+
"DROP TABLE IF EXISTS test_date",
294+
"CREATE TABLE test_date(key INT, value STRING)",
295+
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_date")
296+
297+
queries.foreach(statement.execute)
298+
299+
assertResult(Date.valueOf("2011-01-01")) {
300+
val resultSet = statement.executeQuery(
301+
"SELECT CAST('2011-01-01' as date) FROM test_date LIMIT 1")
302+
resultSet.next()
303+
resultSet.getDate(1)
304+
}
305+
}
306+
}
307+
308+
test("SPARK-4407 regression: Complex type support") {
309+
withJdbcStatement() { statement =>
310+
val queries = Seq(
311+
"DROP TABLE IF EXISTS test_map",
312+
"CREATE TABLE test_map(key INT, value STRING)",
313+
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
314+
315+
queries.foreach(statement.execute)
316+
317+
assertResult("""{238:"val_238"}""") {
318+
val resultSet = statement.executeQuery("SELECT MAP(key, value) FROM test_map LIMIT 1")
319+
resultSet.next()
320+
resultSet.getString(1)
321+
}
322+
323+
assertResult("""["238","val_238"]""") {
324+
val resultSet = statement.executeQuery(
325+
"SELECT ARRAY(CAST(key AS STRING), value) FROM test_map LIMIT 1")
326+
resultSet.next()
327+
resultSet.getString(1)
328+
}
329+
}
330+
}
293331
}

sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
import java.sql.Timestamp
20+
import java.sql.{Date, Timestamp}
2121
import java.util.{ArrayList => JArrayList, Map => JMap}
2222

2323
import scala.collection.JavaConversions._
@@ -131,14 +131,13 @@ private[hive] class SparkExecuteStatementOperation(
131131
to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
132132
case ShortType =>
133133
to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
134+
case DateType =>
135+
to.addColumnValue(ColumnValue.dateValue(from(ordinal).asInstanceOf[Date]))
134136
case TimestampType =>
135137
to.addColumnValue(
136138
ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
137139
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
138-
val hiveString = result
139-
.queryExecution
140-
.asInstanceOf[HiveContext#QueryExecution]
141-
.toHiveString((from.get(ordinal), dataTypes(ordinal)))
140+
val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
142141
to.addColumnValue(ColumnValue.stringValue(hiveString))
143142
}
144143
}
@@ -163,6 +162,8 @@ private[hive] class SparkExecuteStatementOperation(
163162
to.addColumnValue(ColumnValue.byteValue(null))
164163
case ShortType =>
165164
to.addColumnValue(ColumnValue.shortValue(null))
165+
case DateType =>
166+
to.addColumnValue(ColumnValue.dateValue(null))
166167
case TimestampType =>
167168
to.addColumnValue(ColumnValue.timestampValue(null))
168169
case BinaryType | _: ArrayType | _: StructType | _: MapType =>

sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.hive.thriftserver
1919

2020
import java.security.PrivilegedExceptionAction
21-
import java.sql.Timestamp
21+
import java.sql.{Date, Timestamp}
2222
import java.util.concurrent.Future
2323
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
2424

@@ -113,7 +113,7 @@ private[hive] class SparkExecuteStatementOperation(
113113
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
114114
dataTypes(ordinal) match {
115115
case StringType =>
116-
to += from.get(ordinal).asInstanceOf[String]
116+
to += from.getString(ordinal)
117117
case IntegerType =>
118118
to += from.getInt(ordinal)
119119
case BooleanType =>
@@ -123,33 +123,30 @@ private[hive] class SparkExecuteStatementOperation(
123123
case FloatType =>
124124
to += from.getFloat(ordinal)
125125
case DecimalType() =>
126-
to += from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
126+
to += from.getAs[BigDecimal](ordinal).bigDecimal
127127
case LongType =>
128128
to += from.getLong(ordinal)
129129
case ByteType =>
130130
to += from.getByte(ordinal)
131131
case ShortType =>
132132
to += from.getShort(ordinal)
133+
case DateType =>
134+
to += from.getAs[Date](ordinal)
133135
case TimestampType =>
134-
to += from.get(ordinal).asInstanceOf[Timestamp]
135-
case BinaryType =>
136-
to += from.get(ordinal).asInstanceOf[String]
137-
case _: ArrayType =>
138-
to += from.get(ordinal).asInstanceOf[String]
139-
case _: StructType =>
140-
to += from.get(ordinal).asInstanceOf[String]
141-
case _: MapType =>
142-
to += from.get(ordinal).asInstanceOf[String]
136+
to += from.getAs[Timestamp](ordinal)
137+
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
138+
val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
139+
to += hiveString
143140
}
144141
}
145142

146143
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
147144
validateDefaultFetchOrientation(order)
148145
assertState(OperationState.FINISHED)
149146
setHasResultSet(true)
150-
val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
147+
val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
151148
if (!iter.hasNext) {
152-
reultRowSet
149+
resultRowSet
153150
} else {
154151
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
155152
val maxRows = maxRowsL.toInt
@@ -166,10 +163,10 @@ private[hive] class SparkExecuteStatementOperation(
166163
}
167164
curCol += 1
168165
}
169-
reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
166+
resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
170167
curRow += 1
171168
}
172-
reultRowSet
169+
resultRowSet
173170
}
174171
}
175172

0 commit comments

Comments
 (0)