Skip to content

Commit 8361078

Browse files
chenghao-intelmarmbrus
authored andcommitted
[SPARK-5009] [SQL] Long keyword support in SQL Parsers
* The `SqlLexical.allCaseVersions` will cause `StackOverflowException` if the key word is too long, the patch will fix that by normalizing all of the keywords in `SqlLexical`. * And make a unified SparkSQLParser for sharing the common code. Author: Cheng Hao <[email protected]> Closes #3926 from chenghao-intel/long_keyword and squashes the following commits: 686660f [Cheng Hao] Support Long Keyword and Refactor the SQLParsers
1 parent 812d367 commit 8361078

File tree

8 files changed

+128
-81
lines changed

8 files changed

+128
-81
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,42 @@ import scala.util.parsing.input.CharArrayReader.EofCh
2525

2626
import org.apache.spark.sql.catalyst.plans.logical._
2727

28+
private[sql] object KeywordNormalizer {
29+
def apply(str: String) = str.toLowerCase()
30+
}
31+
2832
private[sql] abstract class AbstractSparkSQLParser
2933
extends StandardTokenParsers with PackratParsers {
3034

31-
def apply(input: String): LogicalPlan = phrase(start)(new lexical.Scanner(input)) match {
32-
case Success(plan, _) => plan
33-
case failureOrError => sys.error(failureOrError.toString)
35+
def apply(input: String): LogicalPlan = {
36+
// Initialize the Keywords.
37+
lexical.initialize(reservedWords)
38+
phrase(start)(new lexical.Scanner(input)) match {
39+
case Success(plan, _) => plan
40+
case failureOrError => sys.error(failureOrError.toString)
41+
}
3442
}
3543

36-
protected case class Keyword(str: String)
44+
protected case class Keyword(str: String) {
45+
def normalize = KeywordNormalizer(str)
46+
def parser: Parser[String] = normalize
47+
}
48+
49+
protected implicit def asParser(k: Keyword): Parser[String] = k.parser
50+
51+
// By default, use Reflection to find the reserved words defined in the sub class.
52+
// NOTICE, Since the Keyword properties defined by sub class, we couldn't call this
53+
// method during the parent class instantiation, because the sub class instance
54+
// isn't created yet.
55+
protected lazy val reservedWords: Seq[String] =
56+
this
57+
.getClass
58+
.getMethods
59+
.filter(_.getReturnType == classOf[Keyword])
60+
.map(_.invoke(this).asInstanceOf[Keyword].normalize)
61+
62+
// Set the keywords as empty by default, will change that later.
63+
override val lexical = new SqlLexical
3764

3865
protected def start: Parser[LogicalPlan]
3966

@@ -52,18 +79,27 @@ private[sql] abstract class AbstractSparkSQLParser
5279
}
5380
}
5481

55-
class SqlLexical(val keywords: Seq[String]) extends StdLexical {
82+
class SqlLexical extends StdLexical {
5683
case class FloatLit(chars: String) extends Token {
5784
override def toString = chars
5885
}
5986

60-
reserved ++= keywords.flatMap(w => allCaseVersions(w))
87+
/* This is a work around to support the lazy setting */
88+
def initialize(keywords: Seq[String]): Unit = {
89+
reserved.clear()
90+
reserved ++= keywords
91+
}
6192

6293
delimiters += (
6394
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
6495
",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>"
6596
)
6697

98+
protected override def processIdent(name: String) = {
99+
val token = KeywordNormalizer(name)
100+
if (reserved contains token) Keyword(token) else Identifier(name)
101+
}
102+
67103
override lazy val token: Parser[Token] =
68104
( identChar ~ (identChar | digit).* ^^
69105
{ case first ~ rest => processIdent((first :: rest).mkString) }
@@ -94,14 +130,5 @@ class SqlLexical(val keywords: Seq[String]) extends StdLexical {
94130
| '-' ~ '-' ~ chrExcept(EofCh, '\n').*
95131
| '/' ~ '*' ~ failure("unclosed comment")
96132
).*
97-
98-
/** Generate all variations of upper and lower case of a given string */
99-
def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
100-
if (s.isEmpty) {
101-
Stream(prefix)
102-
} else {
103-
allCaseVersions(s.tail, prefix + s.head.toLower) #:::
104-
allCaseVersions(s.tail, prefix + s.head.toUpper)
105-
}
106-
}
107133
}
134+

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,8 @@ import org.apache.spark.sql.types._
3636
* for a SQL like language should checkout the HiveQL support in the sql/hive sub-project.
3737
*/
3838
class SqlParser extends AbstractSparkSQLParser {
39-
protected implicit def asParser(k: Keyword): Parser[String] =
40-
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
41-
39+
// Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
40+
// properties via reflection the class in runtime for constructing the SqlLexical object
4241
protected val ABS = Keyword("ABS")
4342
protected val ALL = Keyword("ALL")
4443
protected val AND = Keyword("AND")
@@ -108,16 +107,6 @@ class SqlParser extends AbstractSparkSQLParser {
108107
protected val WHEN = Keyword("WHEN")
109108
protected val WHERE = Keyword("WHERE")
110109

111-
// Use reflection to find the reserved words defined in this class.
112-
protected val reservedWords =
113-
this
114-
.getClass
115-
.getMethods
116-
.filter(_.getReturnType == classOf[Keyword])
117-
.map(_.invoke(this).asInstanceOf[Keyword].str)
118-
119-
override val lexical = new SqlLexical(reservedWords)
120-
121110
protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
122111
exprs.zipWithIndex.map {
123112
case (ne: NamedExpression, _) => ne
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst
19+
20+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
21+
import org.apache.spark.sql.catalyst.plans.logical.Command
22+
import org.scalatest.FunSuite
23+
24+
private[sql] case class TestCommand(cmd: String) extends Command
25+
26+
private[sql] class SuperLongKeywordTestParser extends AbstractSparkSQLParser {
27+
protected val EXECUTE = Keyword("THISISASUPERLONGKEYWORDTEST")
28+
29+
override protected lazy val start: Parser[LogicalPlan] = set
30+
31+
private lazy val set: Parser[LogicalPlan] =
32+
EXECUTE ~> ident ^^ {
33+
case fileName => TestCommand(fileName)
34+
}
35+
}
36+
37+
private[sql] class CaseInsensitiveTestParser extends AbstractSparkSQLParser {
38+
protected val EXECUTE = Keyword("EXECUTE")
39+
40+
override protected lazy val start: Parser[LogicalPlan] = set
41+
42+
private lazy val set: Parser[LogicalPlan] =
43+
EXECUTE ~> ident ^^ {
44+
case fileName => TestCommand(fileName)
45+
}
46+
}
47+
48+
class SqlParserSuite extends FunSuite {
49+
50+
test("test long keyword") {
51+
val parser = new SuperLongKeywordTestParser
52+
assert(TestCommand("NotRealCommand") === parser("ThisIsASuperLongKeyWordTest NotRealCommand"))
53+
}
54+
55+
test("test case insensitive") {
56+
val parser = new CaseInsensitiveTestParser
57+
assert(TestCommand("NotRealCommand") === parser("EXECUTE NotRealCommand"))
58+
assert(TestCommand("NotRealCommand") === parser("execute NotRealCommand"))
59+
assert(TestCommand("NotRealCommand") === parser("exEcute NotRealCommand"))
60+
}
61+
}

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
107107
}
108108

109109
protected[sql] def parseSql(sql: String): LogicalPlan = {
110-
ddlParser(sql).getOrElse(sqlParser(sql))
110+
ddlParser(sql, false).getOrElse(sqlParser(sql))
111111
}
112112

113113
protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))

sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
package org.apache.spark.sql
1919

20+
2021
import scala.util.parsing.combinator.RegexParsers
2122

22-
import org.apache.spark.sql.catalyst.{SqlLexical, AbstractSparkSQLParser}
23+
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
2324
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2425
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2526
import org.apache.spark.sql.execution.{UncacheTableCommand, CacheTableCommand, SetCommand}
@@ -61,18 +62,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
6162
protected val TABLE = Keyword("TABLE")
6263
protected val UNCACHE = Keyword("UNCACHE")
6364

64-
protected implicit def asParser(k: Keyword): Parser[String] =
65-
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
66-
67-
private val reservedWords: Seq[String] =
68-
this
69-
.getClass
70-
.getMethods
71-
.filter(_.getReturnType == classOf[Keyword])
72-
.map(_.invoke(this).asInstanceOf[Keyword].str)
73-
74-
override val lexical = new SqlLexical(reservedWords)
75-
7665
override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others
7766

7867
private lazy val cache: Parser[LogicalPlan] =

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,44 +18,42 @@
1818
package org.apache.spark.sql.sources
1919

2020
import scala.language.implicitConversions
21-
import scala.util.parsing.combinator.syntactical.StandardTokenParsers
22-
import scala.util.parsing.combinator.PackratParsers
2321

2422
import org.apache.spark.Logging
2523
import org.apache.spark.sql.{SchemaRDD, SQLContext}
2624
import org.apache.spark.sql.catalyst.plans.logical._
27-
import org.apache.spark.sql.catalyst.SqlLexical
25+
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
2826
import org.apache.spark.sql.execution.RunnableCommand
2927
import org.apache.spark.sql.types._
3028
import org.apache.spark.util.Utils
3129

30+
3231
/**
3332
* A parser for foreign DDL commands.
3433
*/
35-
private[sql] class DDLParser extends StandardTokenParsers with PackratParsers with Logging {
36-
37-
def apply(input: String): Option[LogicalPlan] = {
38-
phrase(ddl)(new lexical.Scanner(input)) match {
39-
case Success(r, x) => Some(r)
40-
case x =>
41-
logDebug(s"Not recognized as DDL: $x")
42-
None
34+
private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
35+
36+
def apply(input: String, exceptionOnError: Boolean): Option[LogicalPlan] = {
37+
try {
38+
Some(apply(input))
39+
} catch {
40+
case _ if !exceptionOnError => None
41+
case x: Throwable => throw x
4342
}
4443
}
4544

4645
def parseType(input: String): DataType = {
46+
lexical.initialize(reservedWords)
4747
phrase(dataType)(new lexical.Scanner(input)) match {
4848
case Success(r, x) => r
4949
case x =>
5050
sys.error(s"Unsupported dataType: $x")
5151
}
5252
}
5353

54-
protected case class Keyword(str: String)
55-
56-
protected implicit def asParser(k: Keyword): Parser[String] =
57-
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
5854

55+
// Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
56+
// properties via reflection the class in runtime for constructing the SqlLexical object
5957
protected val CREATE = Keyword("CREATE")
6058
protected val TEMPORARY = Keyword("TEMPORARY")
6159
protected val TABLE = Keyword("TABLE")
@@ -80,17 +78,10 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
8078
protected val MAP = Keyword("MAP")
8179
protected val STRUCT = Keyword("STRUCT")
8280

83-
// Use reflection to find the reserved words defined in this class.
84-
protected val reservedWords =
85-
this.getClass
86-
.getMethods
87-
.filter(_.getReturnType == classOf[Keyword])
88-
.map(_.invoke(this).asInstanceOf[Keyword].str)
89-
90-
override val lexical = new SqlLexical(reservedWords)
91-
9281
protected lazy val ddl: Parser[LogicalPlan] = createTable
9382

83+
protected def start: Parser[LogicalPlan] = ddl
84+
9485
/**
9586
* `CREATE [TEMPORARY] TABLE avroTable
9687
* USING org.apache.spark.sql.avro

sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,20 @@ package org.apache.spark.sql.hive
2020
import scala.language.implicitConversions
2121

2222
import org.apache.spark.sql.catalyst.plans.logical._
23-
import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, SqlLexical}
23+
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
2424
import org.apache.spark.sql.hive.execution.{AddJar, AddFile, HiveNativeCommand}
2525

2626
/**
2727
* A parser that recognizes all HiveQL constructs together with Spark SQL specific extensions.
2828
*/
2929
private[hive] class ExtendedHiveQlParser extends AbstractSparkSQLParser {
30-
protected implicit def asParser(k: Keyword): Parser[String] =
31-
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
32-
30+
// Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
31+
// properties via reflection the class in runtime for constructing the SqlLexical object
3332
protected val ADD = Keyword("ADD")
3433
protected val DFS = Keyword("DFS")
3534
protected val FILE = Keyword("FILE")
3635
protected val JAR = Keyword("JAR")
3736

38-
private val reservedWords =
39-
this
40-
.getClass
41-
.getMethods
42-
.filter(_.getReturnType == classOf[Keyword])
43-
.map(_.invoke(this).asInstanceOf[Keyword].str)
44-
45-
override val lexical = new SqlLexical(reservedWords)
46-
4737
protected lazy val start: Parser[LogicalPlan] = dfs | addJar | addFile | hiveQl
4838

4939
protected lazy val hiveQl: Parser[LogicalPlan] =

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
7070
if (conf.dialect == "sql") {
7171
super.sql(sqlText)
7272
} else if (conf.dialect == "hiveql") {
73-
new SchemaRDD(this, ddlParser(sqlText).getOrElse(HiveQl.parseSql(sqlText)))
73+
new SchemaRDD(this, ddlParser(sqlText, false).getOrElse(HiveQl.parseSql(sqlText)))
7474
} else {
7575
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
7676
}

0 commit comments

Comments
 (0)