Skip to content

Commit 45ce327

Browse files
committed
Revert "[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types"
Author: Michael Armbrust <[email protected]> Closes apache#3292 from marmbrus/revert4309 and squashes the following commits: 808e96e [Michael Armbrust] Revert "[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types"
1 parent cb6bd83 commit 45ce327

File tree

4 files changed

+115
-142
lines changed

4 files changed

+115
-142
lines changed

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala

Lines changed: 26 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@ package org.apache.spark.sql.hive.thriftserver
1919

2020
import java.io.File
2121
import java.net.ServerSocket
22-
import java.sql.{Date, DriverManager, Statement}
22+
import java.sql.{DriverManager, Statement}
2323
import java.util.concurrent.TimeoutException
2424

25-
import scala.collection.JavaConversions._
2625
import scala.collection.mutable.ArrayBuffer
2726
import scala.concurrent.duration._
2827
import scala.concurrent.{Await, Promise}
@@ -52,15 +51,6 @@ import org.apache.spark.sql.hive.HiveShim
5251
class HiveThriftServer2Suite extends FunSuite with Logging {
5352
Class.forName(classOf[HiveDriver].getCanonicalName)
5453

55-
object TestData {
56-
def getTestDataFilePath(name: String) = {
57-
Thread.currentThread().getContextClassLoader.getResource(s"data/files/$name")
58-
}
59-
60-
val smallKv = getTestDataFilePath("small_kv.txt")
61-
val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt")
62-
}
63-
6454
def randomListeningPort = {
6555
// Let the system to choose a random available port to avoid collision with other parallel
6656
// builds.
@@ -155,8 +145,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
155145
}
156146
}
157147

158-
// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
159-
val env = Seq("SPARK_TESTING" -> "0")
148+
val env = Seq(
149+
// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
150+
"SPARK_TESTING" -> "0",
151+
// Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read
152+
// proper version information from the jar manifest.
153+
"SPARK_PREPEND_CLASSES" -> "")
160154

161155
Process(command, None, env: _*).run(ProcessLogger(
162156
captureThriftServerOutput("stdout"),
@@ -200,12 +194,15 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
200194

201195
test("Test JDBC query execution") {
202196
withJdbcStatement() { statement =>
203-
val queries = Seq(
204-
"SET spark.sql.shuffle.partitions=3",
205-
"DROP TABLE IF EXISTS test",
206-
"CREATE TABLE test(key INT, val STRING)",
207-
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
208-
"CACHE TABLE test")
197+
val dataFilePath =
198+
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
199+
200+
val queries =
201+
s"""SET spark.sql.shuffle.partitions=3;
202+
|CREATE TABLE test(key INT, val STRING);
203+
|LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
204+
|CACHE TABLE test;
205+
""".stripMargin.split(";").map(_.trim).filter(_.nonEmpty)
209206

210207
queries.foreach(statement.execute)
211208

@@ -219,10 +216,14 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
219216

220217
test("SPARK-3004 regression: result set containing NULL") {
221218
withJdbcStatement() { statement =>
219+
val dataFilePath =
220+
Thread.currentThread().getContextClassLoader.getResource(
221+
"data/files/small_kv_with_null.txt")
222+
222223
val queries = Seq(
223224
"DROP TABLE IF EXISTS test_null",
224225
"CREATE TABLE test_null(key INT, val STRING)",
225-
s"LOAD DATA LOCAL INPATH '${TestData.smallKvWithNull}' OVERWRITE INTO TABLE test_null")
226+
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_null")
226227

227228
queries.foreach(statement.execute)
228229

@@ -269,63 +270,24 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
269270

270271
test("SPARK-4292 regression: result set iterator issue") {
271272
withJdbcStatement() { statement =>
273+
val dataFilePath =
274+
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
275+
272276
val queries = Seq(
273277
"DROP TABLE IF EXISTS test_4292",
274278
"CREATE TABLE test_4292(key INT, val STRING)",
275-
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_4292")
279+
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_4292")
276280

277281
queries.foreach(statement.execute)
278282

279283
val resultSet = statement.executeQuery("SELECT key FROM test_4292")
280284

281285
Seq(238, 86, 311, 27, 165).foreach { key =>
282286
resultSet.next()
283-
assert(resultSet.getInt(1) === key)
287+
assert(resultSet.getInt(1) == key)
284288
}
285289

286290
statement.executeQuery("DROP TABLE IF EXISTS test_4292")
287291
}
288292
}
289-
290-
test("SPARK-4309 regression: Date type support") {
291-
withJdbcStatement() { statement =>
292-
val queries = Seq(
293-
"DROP TABLE IF EXISTS test_date",
294-
"CREATE TABLE test_date(key INT, value STRING)",
295-
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_date")
296-
297-
queries.foreach(statement.execute)
298-
299-
assertResult(Date.valueOf("2011-01-01")) {
300-
val resultSet = statement.executeQuery(
301-
"SELECT CAST('2011-01-01' as date) FROM test_date LIMIT 1")
302-
resultSet.next()
303-
resultSet.getDate(1)
304-
}
305-
}
306-
}
307-
308-
test("SPARK-4407 regression: Complex type support") {
309-
withJdbcStatement() { statement =>
310-
val queries = Seq(
311-
"DROP TABLE IF EXISTS test_map",
312-
"CREATE TABLE test_map(key INT, value STRING)",
313-
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
314-
315-
queries.foreach(statement.execute)
316-
317-
assertResult("""{238:"val_238"}""") {
318-
val resultSet = statement.executeQuery("SELECT MAP(key, value) FROM test_map LIMIT 1")
319-
resultSet.next()
320-
resultSet.getString(1)
321-
}
322-
323-
assertResult("""["238","val_238"]""") {
324-
val resultSet = statement.executeQuery(
325-
"SELECT ARRAY(CAST(key AS STRING), value) FROM test_map LIMIT 1")
326-
resultSet.next()
327-
resultSet.getString(1)
328-
}
329-
}
330-
}
331293
}

sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
import java.sql.{Date, Timestamp}
20+
import java.sql.Timestamp
2121
import java.util.{ArrayList => JArrayList, Map => JMap}
2222

2323
import scala.collection.JavaConversions._
@@ -131,13 +131,14 @@ private[hive] class SparkExecuteStatementOperation(
131131
to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
132132
case ShortType =>
133133
to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
134-
case DateType =>
135-
to.addColumnValue(ColumnValue.dateValue(from(ordinal).asInstanceOf[Date]))
136134
case TimestampType =>
137135
to.addColumnValue(
138136
ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
139137
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
140-
val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
138+
val hiveString = result
139+
.queryExecution
140+
.asInstanceOf[HiveContext#QueryExecution]
141+
.toHiveString((from.get(ordinal), dataTypes(ordinal)))
141142
to.addColumnValue(ColumnValue.stringValue(hiveString))
142143
}
143144
}
@@ -162,8 +163,6 @@ private[hive] class SparkExecuteStatementOperation(
162163
to.addColumnValue(ColumnValue.byteValue(null))
163164
case ShortType =>
164165
to.addColumnValue(ColumnValue.shortValue(null))
165-
case DateType =>
166-
to.addColumnValue(ColumnValue.dateValue(null))
167166
case TimestampType =>
168167
to.addColumnValue(ColumnValue.timestampValue(null))
169168
case BinaryType | _: ArrayType | _: StructType | _: MapType =>

sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.hive.thriftserver
1919

2020
import java.security.PrivilegedExceptionAction
21-
import java.sql.{Date, Timestamp}
21+
import java.sql.Timestamp
2222
import java.util.concurrent.Future
2323
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
2424

@@ -113,7 +113,7 @@ private[hive] class SparkExecuteStatementOperation(
113113
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
114114
dataTypes(ordinal) match {
115115
case StringType =>
116-
to += from.getString(ordinal)
116+
to += from.get(ordinal).asInstanceOf[String]
117117
case IntegerType =>
118118
to += from.getInt(ordinal)
119119
case BooleanType =>
@@ -123,30 +123,33 @@ private[hive] class SparkExecuteStatementOperation(
123123
case FloatType =>
124124
to += from.getFloat(ordinal)
125125
case DecimalType() =>
126-
to += from.getAs[BigDecimal](ordinal).bigDecimal
126+
to += from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
127127
case LongType =>
128128
to += from.getLong(ordinal)
129129
case ByteType =>
130130
to += from.getByte(ordinal)
131131
case ShortType =>
132132
to += from.getShort(ordinal)
133-
case DateType =>
134-
to += from.getAs[Date](ordinal)
135133
case TimestampType =>
136-
to += from.getAs[Timestamp](ordinal)
137-
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
138-
val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
139-
to += hiveString
134+
to += from.get(ordinal).asInstanceOf[Timestamp]
135+
case BinaryType =>
136+
to += from.get(ordinal).asInstanceOf[String]
137+
case _: ArrayType =>
138+
to += from.get(ordinal).asInstanceOf[String]
139+
case _: StructType =>
140+
to += from.get(ordinal).asInstanceOf[String]
141+
case _: MapType =>
142+
to += from.get(ordinal).asInstanceOf[String]
140143
}
141144
}
142145

143146
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
144147
validateDefaultFetchOrientation(order)
145148
assertState(OperationState.FINISHED)
146149
setHasResultSet(true)
147-
val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
150+
val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
148151
if (!iter.hasNext) {
149-
resultRowSet
152+
reultRowSet
150153
} else {
151154
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
152155
val maxRows = maxRowsL.toInt
@@ -163,10 +166,10 @@ private[hive] class SparkExecuteStatementOperation(
163166
}
164167
curCol += 1
165168
}
166-
resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
169+
reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
167170
curRow += 1
168171
}
169-
resultRowSet
172+
reultRowSet
170173
}
171174
}
172175

0 commit comments

Comments
 (0)