Skip to content

Commit a9efce4

Browse files
kiszkcloud-fan
authored andcommitted
[SPARK-19104][BACKPORT-2.1][SQL] Lambda variables in ExternalMapToCatalyst should be global
## What changes were proposed in this pull request? This PR is backport of apache#18418 to Spark 2.1. [SPARK-21391](https://issues.apache.org/jira/browse/SPARK-21391) reported this problem in Spark 2.1. The issue happens in `ExternalMapToCatalyst`. For example, the following codes create ExternalMap`ExternalMapToCatalyst`ToCatalyst to convert Scala Map to catalyst map format. ``` val data = Seq.tabulate(10)(i => NestedData(1, Map("key" -> InnerData("name", i + 100)))) val ds = spark.createDataset(data) ``` The `valueConverter` in `ExternalMapToCatalyst` looks like: ``` if (isnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true))) null else named_struct(name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true)).name, true), value, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true)).value) ``` There is a `CreateNamedStruct` expression (`named_struct`) to create a row of `InnerData.name` and `InnerData.value` that are referred by `ExternalMapToCatalyst_value52`. Because `ExternalMapToCatalyst_value52` are local variable, when `CreateNamedStruct` splits expressions to individual functions, the local variable can't be accessed anymore. ## How was this patch tested? Added a new test suite into `DatasetPrimitiveSuite` Author: Kazuaki Ishizaki <[email protected]> Closes apache#18627 from kiszk/SPARK-21391.
1 parent ca4d2aa commit a9efce4

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,12 @@ case class ExternalMapToCatalyst private(
660660
val entry = ctx.freshName("entry")
661661
val entries = ctx.freshName("entries")
662662

663+
val keyElementJavaType = ctx.javaType(keyType)
664+
val valueElementJavaType = ctx.javaType(valueType)
665+
ctx.addMutableState(keyElementJavaType, key, "")
666+
ctx.addMutableState("boolean", valueIsNull, "")
667+
ctx.addMutableState(valueElementJavaType, value, "")
668+
663669
val (defineEntries, defineKeyValue) = child.dataType match {
664670
case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) =>
665671
val javaIteratorCls = classOf[java.util.Iterator[_]].getName
@@ -671,8 +677,8 @@ case class ExternalMapToCatalyst private(
671677
val defineKeyValue =
672678
s"""
673679
final $javaMapEntryCls $entry = ($javaMapEntryCls) $entries.next();
674-
${ctx.javaType(keyType)} $key = (${ctx.boxedType(keyType)}) $entry.getKey();
675-
${ctx.javaType(valueType)} $value = (${ctx.boxedType(valueType)}) $entry.getValue();
680+
$key = (${ctx.boxedType(keyType)}) $entry.getKey();
681+
$value = (${ctx.boxedType(valueType)}) $entry.getValue();
676682
"""
677683

678684
defineEntries -> defineKeyValue
@@ -686,17 +692,17 @@ case class ExternalMapToCatalyst private(
686692
val defineKeyValue =
687693
s"""
688694
final $scalaMapEntryCls $entry = ($scalaMapEntryCls) $entries.next();
689-
${ctx.javaType(keyType)} $key = (${ctx.boxedType(keyType)}) $entry._1();
690-
${ctx.javaType(valueType)} $value = (${ctx.boxedType(valueType)}) $entry._2();
695+
$key = (${ctx.boxedType(keyType)}) $entry._1();
696+
$value = (${ctx.boxedType(valueType)}) $entry._2();
691697
"""
692698

693699
defineEntries -> defineKeyValue
694700
}
695701

696702
val valueNullCheck = if (ctx.isPrimitiveType(valueType)) {
697-
s"boolean $valueIsNull = false;"
703+
s"$valueIsNull = false;"
698704
} else {
699-
s"boolean $valueIsNull = $value == null;"
705+
s"$valueIsNull = $value == null;"
700706
}
701707

702708
val arrayCls = classOf[GenericArrayData].getName

sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import org.apache.spark.sql.test.SharedSQLContext
2121

2222
case class IntClass(value: Int)
2323

24+
case class InnerData(name: String, value: Int)
25+
case class NestedData(id: Int, param: Map[String, InnerData])
26+
2427
package object packageobject {
2528
case class PackageClass(value: Int)
2629
}
@@ -135,4 +138,9 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext {
135138
checkDataset(Seq(PackageClass(1)).toDS(), PackageClass(1))
136139
}
137140

141+
test("SPARK-19104: Lambda variables in ExternalMapToCatalyst should be global") {
142+
val data = Seq.tabulate(10)(i => NestedData(1, Map("key" -> InnerData("name", i + 100))))
143+
val ds = spark.createDataset(data)
144+
checkDataset(ds, data: _*)
145+
}
138146
}

0 commit comments

Comments
 (0)