Skip to content

Commit 62b4c73

Browse files
chenghao-intelmarmbrus
authored andcommitted
[SPARK-7662] [SQL] Resolve correct names for generator in projection
``` select explode(map(value, key)) from src; ``` Throws exception ``` org.apache.spark.sql.AnalysisException: The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 2 aliases but got _c0 ; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:43) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveGenerate$$makeGeneratorOutput(Analyzer.scala:605) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16$$anonfun$22.apply(Analyzer.scala:562) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16$$anonfun$22.apply(Analyzer.scala:548) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16.applyOrElse(Analyzer.scala:548) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16.applyOrElse(Analyzer.scala:538) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222) ``` Author: Cheng Hao <[email protected]> Closes #6178 from chenghao-intel/explode and squashes the following commits: 916fbe9 [Cheng Hao] add more strict rules for TGF alias 5c3f2c5 [Cheng Hao] fix bug in unit test e1d93ab [Cheng Hao] Add more unit test 19db09e [Cheng Hao] resolve names for generator in projection (cherry picked from commit bcb1ff8) Signed-off-by: Michael Armbrust <[email protected]>
1 parent 87fa8cc commit 62b4c73

File tree

3 files changed

+42
-4
lines changed

3 files changed

+42
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,21 @@ class Analyzer(
561561
/** Extracts a [[Generator]] expression and any names assigned by aliases to their output. */
562562
private object AliasedGenerator {
563563
def unapply(e: Expression): Option[(Generator, Seq[String])] = e match {
564+
case Alias(g: Generator, name)
565+
if g.elementTypes.size > 1 && java.util.regex.Pattern.matches("_c[0-9]+", name) => {
566+
// Assume the default name given by parser is "_c[0-9]+",
567+
// TODO in long term, move the naming logic from Parser to Analyzer.
568+
// In projection, Parser gave default name for TGF as does for normal UDF,
569+
// but the TGF probably have multiple output columns/names.
570+
// e.g. SELECT explode(map(key, value)) FROM src;
571+
// Let's simply ignore the default given name for this case.
572+
Some((g, Nil))
573+
}
574+
case Alias(g: Generator, name) if g.elementTypes.size > 1 =>
575+
// If not given the default names, and the TGF with multiple output columns
576+
failAnalysis(
577+
s"""Expect multiple names given for ${g.getClass.getName},
578+
|but only single name '${name}' specified""".stripMargin)
564579
case Alias(g: Generator, name) => Some((g, name :: Nil))
565580
case MultiAlias(g: Generator, names) => Some(g, names)
566581
case _ => None

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,13 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
111111
| SELECT key FROM gen_tmp ORDER BY key ASC;
112112
""".stripMargin)
113113

114-
test("multiple generator in projection") {
114+
test("multiple generators in projection") {
115115
intercept[AnalysisException] {
116-
sql("SELECT explode(map(key, value)), key FROM src").collect()
116+
sql("SELECT explode(array(key, key)), explode(array(key, key)) FROM src").collect()
117117
}
118118

119119
intercept[AnalysisException] {
120-
sql("SELECT explode(map(key, value)) as k1, k2, key FROM src").collect()
120+
sql("SELECT explode(array(key, key)) as k1, explode(array(key, key)) FROM src").collect()
121121
}
122122
}
123123

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,13 +548,36 @@ class SQLQuerySuite extends QueryTest {
548548
dropTempTable("data")
549549
}
550550

551-
test("resolve udtf with single alias") {
551+
test("resolve udtf in projection #1") {
552552
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
553553
read.json(rdd).registerTempTable("data")
554554
val df = sql("SELECT explode(a) AS val FROM data")
555555
val col = df("val")
556556
}
557557

558+
test("resolve udtf in projection #2") {
559+
val rdd = sparkContext.makeRDD((1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}"""))
560+
jsonRDD(rdd).registerTempTable("data")
561+
checkAnswer(sql("SELECT explode(map(1, 1)) FROM data LIMIT 1"), Row(1, 1) :: Nil)
562+
checkAnswer(sql("SELECT explode(map(1, 1)) as (k1, k2) FROM data LIMIT 1"), Row(1, 1) :: Nil)
563+
intercept[AnalysisException] {
564+
sql("SELECT explode(map(1, 1)) as k1 FROM data LIMIT 1")
565+
}
566+
567+
intercept[AnalysisException] {
568+
sql("SELECT explode(map(1, 1)) as (k1, k2, k3) FROM data LIMIT 1")
569+
}
570+
}
571+
572+
// TGF with non-TGF in project is allowed in Spark SQL, but not in Hive
573+
test("TGF with non-TGF in projection") {
574+
val rdd = sparkContext.makeRDD( """{"a": "1", "b":"1"}""" :: Nil)
575+
jsonRDD(rdd).registerTempTable("data")
576+
checkAnswer(
577+
sql("SELECT explode(map(a, b)) as (k1, k2), a, b FROM data"),
578+
Row("1", "1", "1", "1") :: Nil)
579+
}
580+
558581
test("logical.Project should not be resolved if it contains aggregates or generators") {
559582
// This test is used to test the fix of SPARK-5875.
560583
// The original issue was that Project's resolved will be true when it contains

0 commit comments

Comments
 (0)