Skip to content

Commit c49c70c

Browse files
committed
Avoid dynamic dispatching when unwrapping Hive data.
Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
1 parent 366c0c4 commit c49c70c

File tree

2 files changed

+18
-15
lines changed

2 files changed

+18
-15
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
2626
import org.apache.hadoop.hive.serde.serdeConstants
2727
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
2828
import org.apache.hadoop.hive.serde2.objectinspector._
29-
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
30-
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
29+
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
3130
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
3231
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
3332
import org.apache.hadoop.io.Writable
@@ -95,29 +94,34 @@ case class HiveTableScan(
9594
attributes.map { a =>
9695
val ordinal = relation.partitionKeys.indexOf(a)
9796
if (ordinal >= 0) {
97+
val dataType = relation.partitionKeys(ordinal).dataType
9898
(_: Any, partitionKeys: Array[String]) => {
99-
val value = partitionKeys(ordinal)
100-
val dataType = relation.partitionKeys(ordinal).dataType
101-
unwrapHiveData(castFromString(value, dataType))
99+
castFromString(partitionKeys(ordinal), dataType)
102100
}
103101
} else {
104102
val ref = objectInspector.getAllStructFieldRefs
105103
.find(_.getFieldName == a.name)
106104
.getOrElse(sys.error(s"Can't find attribute $a"))
105+
val fieldObjectInspector = ref.getFieldObjectInspector
106+
107+
val unwrapHiveData = fieldObjectInspector match {
108+
case _: HiveVarcharObjectInspector =>
109+
(value: Any) => value.asInstanceOf[HiveVarchar].getValue
110+
case _: HiveDecimalObjectInspector =>
111+
(value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue())
112+
case _ =>
113+
identity[Any] _
114+
}
115+
107116
(row: Any, _: Array[String]) => {
108117
val data = objectInspector.getStructFieldData(row, ref)
109-
unwrapHiveData(unwrapData(data, ref.getFieldObjectInspector))
118+
val hiveData = unwrapData(data, fieldObjectInspector)
119+
if (hiveData != null) unwrapHiveData(hiveData) else null
110120
}
111121
}
112122
}
113123
}
114124

115-
private def unwrapHiveData(value: Any) = value match {
116-
case varchar: HiveVarchar => varchar.getValue
117-
case decimal: HiveDecimal => BigDecimal(decimal.bigDecimalValue)
118-
case other => other
119-
}
120-
121125
private def castFromString(value: String, dataType: DataType) = {
122126
Cast(Literal(value), dataType).eval(null)
123127
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,14 @@ abstract class HiveComparisonTest
133133
def isSorted(plan: LogicalPlan): Boolean = plan match {
134134
case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false
135135
case PhysicalOperation(_, _, Sort(_, _)) => true
136-
case _ => plan.children.iterator.map(isSorted).exists(_ == true)
136+
case _ => plan.children.iterator.exists(isSorted)
137137
}
138138

139139
val orderedAnswer = hiveQuery.logical match {
140140
// Clean out non-deterministic time schema info.
141141
case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
142142
case _: ExplainCommand => answer
143-
case plan if isSorted(plan) => answer
144-
case _ => answer.sorted
143+
case plan => if (isSorted(plan)) answer else answer.sorted
145144
}
146145
orderedAnswer.map(cleanPaths)
147146
}

0 commit comments

Comments
 (0)