Skip to content

[SQL] Update SparkSQL and ScalaTest in branch-1.0 to match master. #1078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<artifactId>easymockclassextension</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.language.postfixOps
import scala.util.Random

import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.{PatienceConfiguration, Eventually}
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

Expand Down Expand Up @@ -76,7 +76,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
tester.assertCleanup()

// Verify that shuffles can be re-executed after cleaning up
assert(rdd.collect().toList === collected)
assert(rdd.collect().toList.equals(collected))
}

test("cleanup broadcast") {
Expand Down Expand Up @@ -285,7 +285,7 @@ class CleanerTester(
sc.cleaner.get.attachListener(cleanerListener)

/** Assert that all the stuff has been cleaned up */
def assertCleanup()(implicit waitTimeout: Eventually.Timeout) {
def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
try {
eventually(waitTimeout, interval(100 millis)) {
assert(isAllCleanedUp)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {

// This test suite should run all tests in ShuffleSuite with Netty shuffle mode.

override def beforeAll(configMap: Map[String, Any]) {
override def beforeAll() {
System.setProperty("spark.shuffle.use.netty", "true")
}

override def afterAll(configMap: Map[String, Any]) {
override def afterAll() {
System.setProperty("spark.shuffle.use.netty", "false")
}
}
5 changes: 3 additions & 2 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {

// we can optionally shuffle to keep the upstream parallel
val coalesced5 = data.coalesce(1, shuffle = true)
assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
null)
val isEquals = coalesced5.dependencies.head.rdd.dependencies.head.rdd.
asInstanceOf[ShuffledRDD[_, _, _]] != null
assert(isEquals)

// when shuffling, we can increase the number of partitions
val coalesced6 = data.coalesce(20, shuffle = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.language.reflectiveCalls

import akka.actor._
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.{BeforeAndAfter, FunSuiteLike}

import org.apache.spark._
import org.apache.spark.rdd.RDD
Expand All @@ -37,7 +37,7 @@ class BuggyDAGEventProcessActor extends Actor {
}
}

class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuite
class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
with ImplicitSender with BeforeAndAfter with LocalSparkContext {

val conf = new SparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class TimeStampedHashMapSuite extends FunSuite {
map("k1") = strongRef
map("k2") = "v2"
map("k3") = "v3"
assert(map("k1") === strongRef)
val isEquals = map("k1") == strongRef
assert(isEquals)

// clear strong reference to "k1"
strongRef = null
Expand Down
15 changes: 11 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -458,25 +458,31 @@
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>1.9.1</version>
<version>2.1.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<artifactId>easymockclassextension</artifactId>
<version>3.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.5</version>
<version>1.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<version>1.10.0</version>
<version>1.11.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -778,6 +784,7 @@
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
<arg>-language:postfixOps</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
Expand Down
22 changes: 11 additions & 11 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,17 @@ object SparkBuild extends Build {
*/

libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.17.Final",
"org.eclipse.jetty" % "jetty-server" % jettyVersion,
"org.eclipse.jetty" % "jetty-util" % jettyVersion,
"org.eclipse.jetty" % "jetty-plus" % jettyVersion,
"org.eclipse.jetty" % "jetty-security" % jettyVersion,
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymock" % "3.1" % "test",
"org.mockito" % "mockito-all" % "1.8.5" % "test"
"io.netty" % "netty-all" % "4.0.17.Final",
"org.eclipse.jetty" % "jetty-server" % jettyVersion,
"org.eclipse.jetty" % "jetty-util" % jettyVersion,
"org.eclipse.jetty" % "jetty-plus" % jettyVersion,
"org.eclipse.jetty" % "jetty-security" % jettyVersion,
"org.scalatest" %% "scalatest" % "2.1.5" % "test",
"org.scalacheck" %% "scalacheck" % "1.11.3" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymockclassextension" % "3.1" % "test",
"org.mockito" % "mockito-all" % "1.9.0" % "test",
"junit" % "junit" % "4.10" % "test"
),

testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
Expand Down Expand Up @@ -476,7 +477,6 @@ object SparkBuild extends Build {
// this non-deterministically. TODO: FIX THIS.
parallelExecution in Test := false,
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"com.typesafe" %% "scalalogging-slf4j" % "1.0.1"
)
)
Expand Down
6 changes: 4 additions & 2 deletions repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ class ReplSuite extends FunSuite {
}

def assertContains(message: String, output: String) {
assert(output.contains(message),
val isContain = output.contains(message)
assert(isContain,
"Interpreter output did not contain '" + message + "':\n" + output)
}

def assertDoesNotContain(message: String, output: String) {
assert(!output.contains(message),
val isContain = output.contains(message)
assert(!isContain,
"Interpreter output contained '" + message + "':\n" + output)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
import org.apache.spark.sql.catalyst.types.StringType

/**
Expand All @@ -26,35 +26,37 @@ import org.apache.spark.sql.catalyst.types.StringType
*/
abstract class Command extends LeafNode {
self: Product =>
def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
def output: Seq[Attribute] = Seq.empty
}

/**
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
* commands that are passed directly to another system.
*/
case class NativeCommand(cmd: String) extends Command
case class NativeCommand(cmd: String) extends Command {
override def output =
Seq(BoundReference(0, AttributeReference("result", StringType, nullable = false)()))
}

/**
* Commands of the form "SET (key) (= value)".
*/
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
override def output = Seq(
AttributeReference("key", StringType, nullable = false)(),
AttributeReference("value", StringType, nullable = false)()
)
BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
}

/**
* Returned by a parser when the users only wants to see what query plan would be executed, without
* actually performing the execution.
*/
case class ExplainCommand(plan: LogicalPlan) extends Command {
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
override def output =
Seq(BoundReference(0, AttributeReference("plan", StringType, nullable = false)()))
}

/**
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
*/
case class CacheCommand(tableName: String, doCache: Boolean) extends Command

Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class FilterPushdownSuite extends OptimizerTest {

comparePlans(optimized, correctAnswer)
}

test("joins: push down left outer join #1") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
Expand Down
45 changes: 10 additions & 35 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor

import org.apache.spark.sql.columnar.InMemoryRelation
Expand Down Expand Up @@ -147,14 +147,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @group userf
*/
def sql(sqlText: String): SchemaRDD = {
val result = new SchemaRDD(this, parseSql(sqlText))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
result.queryExecution.toRdd
result
}
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))

/** Returns the specified table as a SchemaRDD */
def table(tableName: String): SchemaRDD =
Expand Down Expand Up @@ -220,17 +213,21 @@ class SQLContext(@transient val sparkContext: SparkContext)
* final desired output requires complex expressions to be evaluated or when columns can be
* further eliminated out after filtering has been done.
*
* The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
* away by the filter pushdown optimization.
*
* The required attributes for both filtering and expression evaluation are passed to the
* provided `scanBuilder` function so that it can avoid unnecessary column materialization.
*/
def pruneFilterProject(
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
prunePushedDownFilters: Seq[Expression] => Seq[Expression],
scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {

val projectSet = projectList.flatMap(_.references).toSet
val filterSet = filterPredicates.flatMap(_.references).toSet
val filterCondition = filterPredicates.reduceLeftOption(And)
val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And)

// Right now we still use a projection even if the only evaluation is applying an alias
// to a column. Since this is a no-op, it could be avoided. However, using this
Expand All @@ -255,8 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] val planner = new SparkPlanner

@transient
protected[sql] lazy val emptyResult =
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1)

/**
* Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
Expand All @@ -276,35 +272,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected abstract class QueryExecution {
def logical: LogicalPlan

def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
case SetCommand(key, value) =>
// Only this case needs to be executed eagerly. The other cases will
// be taken care of when the actual results are being extracted.
// In the case of HiveContext, sqlConf is overridden to also pass the
// pair into its HiveConf.
if (key.isDefined && value.isDefined) {
set(key.get, value.get)
}
// It doesn't matter what we return here, since this is only used
// to force the evaluation to happen eagerly. To query the results,
// one must use SchemaRDD operations to extract them.
emptyResult
case _ => executedPlan.execute()
}

lazy val analyzed = analyzer(logical)
lazy val optimizedPlan = optimizer(analyzed)
// TODO: Don't just pick the first one...
lazy val sparkPlan = planner(optimizedPlan).next()
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = {
logical match {
case s: SetCommand => eagerlyProcess(s)
case _ => executedPlan.execute()
}
}
lazy val toRdd: RDD[Row] = executedPlan.execute()

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
Expand All @@ -326,7 +301,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* TODO: We only support primitive types, add support for nested types.
*/
private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
val schema = rdd.first.map { case (fieldName, obj) =>
val schema = rdd.first().map { case (fieldName, obj) =>
val dataType = obj.getClass match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ import java.util.{Map => JMap}
@AlphaComponent
class SchemaRDD(
@transient val sqlContext: SQLContext,
@transient protected[spark] val logicalPlan: LogicalPlan)
@transient val baseLogicalPlan: LogicalPlan)
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {

def baseSchemaRDD = this
Expand Down
15 changes: 13 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package org.apache.spark.sql
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.SparkLogicalPlan

/**
* Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java)
*/
private[sql] trait SchemaRDDLike {
@transient val sqlContext: SQLContext
@transient protected[spark] val logicalPlan: LogicalPlan
@transient val baseLogicalPlan: LogicalPlan

private[sql] def baseSchemaRDD: SchemaRDD

Expand All @@ -48,7 +49,17 @@ private[sql] trait SchemaRDDLike {
*/
@transient
@DeveloperApi
lazy val queryExecution = sqlContext.executePlan(logicalPlan)
lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)

@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
queryExecution.toRdd
SparkLogicalPlan(queryExecution.executedPlan)
case _ =>
baseLogicalPlan
}

override def toString =
s"""${super.toString}
Expand Down
Loading