Skip to content

Commit 1f7d00a

Browse files
committed
Merge pull request apache#41 from marmbrus/splitComponents
Break catalyst into 3 major components and move everything into org.apache.spark.sql
2 parents a7ad058 + 7588a57 commit 1f7d00a

File tree

11,374 files changed

+1330
-1198
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

11,374 files changed

+1330
-1198
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@
99
- $HOME/.ivy2
1010
- $HOME/.sbt
1111
script:
12-
- "GIT_AUTHOR_NAME=\"Michael Armbrust\" GIT_AUTHOR_EMAIL=\"[email protected]\" GIT_COMMITTER_NAME=\"Michael Armbrust\" GIT_COMMITTER_EMAIL=\"[email protected]\" sbt ++$TRAVIS_SCALA_VERSION 'set scalacOptions += \"-Xfatal-warnings\"' test:compile scalastyle test ghpages-push-site"
12+
- "GIT_AUTHOR_NAME=\"Michael Armbrust\" GIT_AUTHOR_EMAIL=\"[email protected]\" GIT_COMMITTER_NAME=\"Michael Armbrust\" GIT_COMMITTER_EMAIL=\"[email protected]\" sbt ++$TRAVIS_SCALA_VERSION 'set scalacOptions += \"-Xfatal-warnings\"' test:compile test scalastyle"

README.md

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,30 +61,36 @@ Welcome to Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_4
6161
Type in expressions to have them evaluated.
6262
Type :help for more information.
6363

64-
scala> val query = "SELECT * FROM (SELECT * FROM src) a".q
65-
query: catalyst.execution.TestShark.SharkSqlQuery =
64+
scala> scala> val query = sql("SELECT * FROM (SELECT * FROM src) a")
65+
query: org.apache.spark.sql.ExecutedQuery =
6666
SELECT * FROM (SELECT * FROM src) a
67-
== Logical Plan ==
68-
Project {key#0,value#1}
69-
Subquery a
70-
Project {key#0,value#1}
71-
MetastoreRelation default, src, None
67+
=== Query Plan ===
68+
Project [key#6:0.0,value#7:0.1]
69+
HiveTableScan [key#6,value#7], (MetastoreRelation default, src, None), None
70+
```
7271

73-
== Physical Plan ==
74-
Project {key#0,value#1}
75-
HiveTableScan {key#0,value#1}, (MetastoreRelation default, src, None)
72+
Query results are RDDs and can be operated as such.
73+
```
74+
scala> query.collect()
75+
res8: Array[org.apache.spark.sql.execution.Row] = Array([238,val_238], [86,val_86], [311,val_311]...
76+
```
77+
78+
You can also build further queries on top of these RDDs using the query DSL.
79+
```
80+
scala> query.where('key === 100).toRdd.collect()
81+
res11: Array[org.apache.spark.sql.execution.Row] = Array([100,val_100], [100,val_100])
7682
```
7783

7884
From the console you can even write rules that transform query plans. For example, the above query has redundant project operators that aren't doing anything. This redundancy can be eliminated using the `transform` function that is available on all [`TreeNode`](http://databricks.github.io/catalyst/latest/api/#catalyst.trees.TreeNode) objects.
7985
```scala
80-
scala> query.optimizedPlan
86+
scala> query.logicalPlan
8187
res1: catalyst.plans.logical.LogicalPlan =
8288
Project {key#0,value#1}
8389
Project {key#0,value#1}
8490
MetastoreRelation default, src, None
8591

8692

87-
scala> res0.optimizedPlan transform {
93+
scala> query.logicalPlan transform {
8894
| case Project(projectList, child) if projectList == child.output => child
8995
| }
9096
res2: catalyst.plans.logical.LogicalPlan =

build.sbt

Lines changed: 51 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,52 @@
1-
import AssemblyKeys._ // put this at the top of the file
21

3-
name := "catalyst"
4-
5-
organization := "com.databricks"
6-
7-
version := "0.1-SNAPSHOT"
8-
9-
scalaVersion := "2.10.3"
10-
11-
scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked")
12-
13-
resolvers += "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository"
14-
15-
// TODO: Remove when Spark 0.9.0 is released for real.
16-
resolvers += "SparkStaging" at "https://repository.apache.org/content/repositories/orgapachespark-1006/"
17-
18-
libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating"
19-
20-
// Hive 0.10.0 relies on a weird version of jdo that is not published anywhere... Remove when we upgrade to 0.11.0
21-
libraryDependencies += "javax.jdo" % "jdo2-api" % "2.3-ec" from "http://www.datanucleus.org/downloads/maven2/javax/jdo/jdo2-api/2.3-ec/jdo2-api-2.3-ec.jar"
22-
23-
libraryDependencies ++= Seq(
24-
"org.apache.hadoop" % "hadoop-client" % "1.0.4",
25-
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
26-
//"net.hydromatic" % "optiq-core" % "0.4.16-SNAPSHOT",
27-
"org.apache.hive" % "hive-metastore" % "0.12.0",
28-
"org.apache.hive" % "hive-exec" % "0.12.0",
29-
"org.apache.hive" % "hive-serde" % "0.12.0",
30-
"com.typesafe" %% "scalalogging-slf4j" % "1.0.1")
31-
32-
org.scalastyle.sbt.ScalastylePlugin.Settings
33-
34-
// Multiple queries rely on the TestShark singleton. See comments there for more details.
35-
parallelExecution in Test := false
36-
37-
resolvers ++= Seq(
38-
// For Optiq
39-
"Conjars Repository" at "http://conjars.org/repo/",
40-
// For jdo-2 required by Hive < 0.12.0
41-
"Datanucleus Repository" at "http://www.datanucleus.org/downloads/maven2")
42-
43-
resolvers += "Databees" at "http://repository-databricks.forge.cloudbees.com/snapshot/"
44-
45-
initialCommands in console := """
46-
import catalyst.analysis._
47-
import catalyst.dsl._
48-
import catalyst.errors._
49-
import catalyst.expressions._
50-
import catalyst.frontend._
51-
import catalyst.plans.logical._
52-
import catalyst.rules._
53-
import catalyst.types._
54-
import catalyst.util._
55-
import catalyst.execution.TestShark._"""
56-
57-
site.settings
58-
59-
ghpages.settings
60-
61-
git.remoteRepo := "[email protected]:databricks/catalyst.git"
62-
63-
site.settings
64-
65-
site.includeScaladoc()
66-
67-
assemblySettings
68-
69-
test in assembly := {}
70-
71-
mergeStrategy in assembly := {
72-
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
73-
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
74-
case "log4j.properties" => MergeStrategy.discard
75-
case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
76-
case "reference.conf" => MergeStrategy.concat
77-
case _ => MergeStrategy.first
78-
}
79-
80-
scalacOptions in (Compile, doc) <++= (baseDirectory) map {
81-
bd => Seq("-sourcepath", bd.getAbsolutePath, "-doc-source-url","https://github.com/databricks/catalyst/blob/master/€{FILE_PATH}.scala")
82-
}
2+
lazy val catalyst = Project("catalyst", file("catalyst"), settings = catalystSettings)
3+
lazy val core = Project("core", file("core"), settings = coreSettings).dependsOn(catalyst)
4+
lazy val shark = Project("shark", file("shark"), settings = sharkSettings).dependsOn(core)
5+
6+
def sharedSettings = Defaults.defaultSettings ++ Seq(
7+
organization := "org.apache.spark.sql",
8+
version := "0.1-SNAPSHOT",
9+
scalaVersion := "2.10.3",
10+
scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked"),
11+
// Common Dependencies.
12+
libraryDependencies ++= Seq(
13+
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
14+
"com.typesafe" %% "scalalogging-slf4j" % "1.0.1")
15+
) ++ org.scalastyle.sbt.ScalastylePlugin.Settings
16+
17+
def catalystSettings = sharedSettings ++ Seq(
18+
name := "catalyst",
19+
// The mechanics of rewriting expression ids to compare trees in some test cases makes
20+
// assumptions about the the expression ids being contiguious. Running tests in parallel breaks
21+
// this non-deterministically. TODO: FIX THIS.
22+
parallelExecution in Test := false
23+
)
24+
25+
def coreSettings = sharedSettings ++ Seq(
26+
name := "core",
27+
libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating"
28+
)
29+
30+
def sharkSettings = sharedSettings ++ Seq(
31+
name := "shark",
32+
libraryDependencies ++= Seq(
33+
"org.apache.hadoop" % "hadoop-client" % "1.0.4",
34+
"org.apache.hive" % "hive-metastore" % "0.12.0",
35+
"org.apache.hive" % "hive-exec" % "0.12.0",
36+
"org.apache.hive" % "hive-serde" % "0.12.0"),
37+
// Multiple queries rely on the TestShark singleton. See comments there for more details.
38+
parallelExecution in Test := false,
39+
initialCommands in console :=
40+
"""
41+
|import org.apache.spark.sql.catalyst.analysis._
42+
|import org.apache.spark.sql.catalyst.dsl._
43+
|import org.apache.spark.sql.catalyst.errors._
44+
|import org.apache.spark.sql.catalyst.expressions._
45+
|import org.apache.spark.sql.catalyst.plans.logical._
46+
|import org.apache.spark.sql.catalyst.rules._
47+
|import org.apache.spark.sql.catalyst.types._
48+
|import org.apache.spark.sql.catalyst.util._
49+
|import org.apache.spark.sql.execution
50+
|import org.apache.spark.sql.shark._
51+
|import org.apache.spark.sql.shark.TestShark._""".stripMargin
52+
)

src/main/scala/catalyst/analysis/Analyzer.scala renamed to catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
package org.apache.spark.sql
12
package catalyst
23
package analysis
34

45
import expressions._
56
import plans.logical._
67
import rules._
7-
import catalyst.execution.MetastoreRelation
88

99
/**
1010
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
@@ -36,7 +36,6 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
3636
StarExpansion ::
3737
ResolveFunctions ::
3838
GlobalAggregates ::
39-
PreInsertionCasts ::
4039
typeCoercionRules :_*)
4140
)
4241

@@ -165,34 +164,4 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
165164
protected def containsStar(exprs: Seq[Expression]): Boolean =
166165
exprs.collect { case _: Star => true }.nonEmpty
167166
}
168-
169-
/**
170-
* Casts input data to correct data types according to table definition before inserting into
171-
* that table.
172-
*/
173-
object PreInsertionCasts extends Rule[LogicalPlan] {
174-
def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
175-
// Wait until children are resolved
176-
case p: LogicalPlan if !p.childrenResolved => p
177-
178-
case p @ InsertIntoTable(table: MetastoreRelation, _, child) =>
179-
val childOutputDataTypes = child.output.map(_.dataType)
180-
// Only check attributes, not partitionKeys since they are always strings.
181-
// TODO: Fully support inserting into partitioned tables.
182-
val tableOutputDataTypes = table.attributes.map(_.dataType)
183-
184-
if (childOutputDataTypes == tableOutputDataTypes) {
185-
p
186-
} else {
187-
// Only do the casting when child output data types differ from table output data types.
188-
val castedChildOutput = child.output.zip(table.output).map {
189-
case (input, table) if input.dataType != table.dataType =>
190-
Alias(Cast(input, table.dataType), input.name)()
191-
case (input, _) => input
192-
}
193-
194-
p.copy(child = Project(castedChildOutput, child))
195-
}
196-
}
197-
}
198167
}

src/main/scala/catalyst/analysis/Catalog.scala renamed to catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
package org.apache.spark.sql
12
package catalyst
23
package analysis
34

45
import plans.logical.LogicalPlan
6+
import scala.collection.mutable
57

68
/**
79
* An interface for looking up relations by name. Used by an [[Analyzer]].
@@ -13,6 +15,24 @@ trait Catalog {
1315
alias: Option[String] = None): LogicalPlan
1416
}
1517

18+
trait OverrideCatalog extends Catalog {
19+
20+
// TODO: This doesn't work when the database changes...
21+
val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
22+
23+
abstract override def lookupRelation(
24+
databaseName: Option[String],
25+
tableName: String,
26+
alias: Option[String] = None): LogicalPlan = {
27+
28+
overrides.get((databaseName, tableName))
29+
.getOrElse(super.lookupRelation(databaseName, tableName, alias))
30+
}
31+
32+
def overrideTable(databaseName: Option[String], tableName: String, plan: LogicalPlan) =
33+
overrides.put((databaseName, tableName), plan)
34+
}
35+
1636
/**
1737
* A trivial catalog that returns an error when a relation is requested. Used for testing when all
1838
* relations are already filled in and the analyser needs only to resolve attribute references.

src/main/scala/catalyst/analysis/FunctionRegistry.scala renamed to catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
package org.apache.spark.sql
12
package catalyst
23
package analysis
34

src/main/scala/catalyst/analysis/HiveTypeCoercion.scala renamed to catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
package org.apache.spark.sql
12
package catalyst
23
package analysis
34

src/main/scala/catalyst/analysis/unresolved.scala renamed to catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
package org.apache.spark.sql
12
package catalyst
23
package analysis
34

@@ -10,14 +11,14 @@ import trees.TreeNode
1011
* resolved.
1112
*/
1213
class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: String) extends
13-
errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object")
14+
errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null)
1415

1516
/**
1617
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
1718
*/
1819
case class UnresolvedRelation(
1920
databaseName: Option[String],
20-
name: String,
21+
tableName: String,
2122
alias: Option[String] = None) extends BaseRelation {
2223
def output = Nil
2324
override lazy val resolved = false

src/main/scala/catalyst/dsl/package.scala renamed to catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
package org.apache.spark.sql
12
package catalyst
23

34
import scala.language.implicitConversions
@@ -118,10 +119,10 @@ package object dsl {
118119

119120
def unionAll(otherPlan: LogicalPlan) = Union(plan, otherPlan)
120121

121-
def filter[T1](arg1: Symbol)(udf: (T1) => Boolean) =
122+
def sfilter[T1](arg1: Symbol)(udf: (T1) => Boolean) =
122123
Filter(ScalaUdf(udf, BooleanType, Seq(UnresolvedAttribute(arg1.name))), plan)
123124

124-
def filter(dynamicUdf: (DynamicRow) => Boolean) =
125+
def sfilter(dynamicUdf: (DynamicRow) => Boolean) =
125126
Filter(ScalaUdf(dynamicUdf, BooleanType, Seq(WrapDynamic(plan.output))), plan)
126127

127128
def sample(

0 commit comments

Comments
 (0)