From bac3a6bc05fecca9d7ebb3e544b2edcfdca1c50d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 8 Apr 2015 17:48:22 -0700 Subject: [PATCH 1/3] fix Date serialization --- .../scala/org/apache/spark/api/r/SerDe.scala | 7 +++++-- .../scala/org/apache/spark/util}/DateUtils.scala | 6 +----- .../apache/spark/repl/SparkJLineCompletion.scala | 16 ++++++---------- .../main/scala/org/apache/spark/sql/Row.scala | 2 +- .../spark/sql/catalyst/ScalaReflection.scala | 2 +- .../spark/sql/catalyst/expressions/Cast.scala | 5 +++-- .../expressions/codegen/CodeGenerator.scala | 6 +++++- .../sql/catalyst/expressions/literals.scala | 1 + .../expressions/ExpressionEvaluationSuite.scala | 2 +- .../apache/spark/sql/execution/pythonUdfs.scala | 1 + .../org/apache/spark/sql/json/JsonRDD.scala | 1 + .../org/apache/spark/sql/json/JsonSuite.scala | 3 +-- .../spark/sql/parquet/ParquetIOSuite.scala | 1 + .../apache/spark/sql/hive/HiveInspectors.scala | 1 + .../org/apache/spark/sql/hive/TableReader.scala | 2 +- 15 files changed, 30 insertions(+), 26 deletions(-) rename {sql/catalyst/src/main/scala/org/apache/spark/sql/types => core/src/main/scala/org/apache/spark/util}/DateUtils.scala (91%) diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index 4ba6feb868fd3..5b68f96ab5a3c 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -22,6 +22,8 @@ import java.sql.{Date, Time} import scala.collection.JavaConversions._ +import org.apache.spark.util.DateUtils + /** * Utility functions to serialize, deserialize objects to / from R */ @@ -101,7 +103,7 @@ private[spark] object SerDe { def readDate(in: DataInputStream): Date = { val d = in.readInt() - new Date(d.toLong * 24 * 3600 * 1000) + DateUtils.toJavaDate(d) } def readTime(in: DataInputStream): Time = { @@ -277,7 +279,8 @@ private[spark] object SerDe { } def writeDate(out: DataOutputStream, value: Date): Unit = { - out.writeInt((value.getTime / 1000 / 3600 / 24).toInt) + val d = DateUtils.fromJavaDate(value) + out.writeInt(d) } def writeTime(out: DataOutputStream, value: Time): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala b/core/src/main/scala/org/apache/spark/util/DateUtils.scala similarity index 91% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala rename to core/src/main/scala/org/apache/spark/util/DateUtils.scala index 8a1a3b81b3d2c..0c4e07b6c7b5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/DateUtils.scala @@ -15,13 +15,11 @@ * limitations under the License. */ -package org.apache.spark.sql.types +package org.apache.spark.util import java.sql.Date import java.util.{Calendar, TimeZone} -import org.apache.spark.sql.catalyst.expressions.Cast - /** * helper function to convert between Int value of days since 1970-01-01 and java.sql.Date */ @@ -55,6 +53,4 @@ object DateUtils { def toJavaDate(daysSinceEpoch: Int): java.sql.Date = { new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch)) } - - def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days)) } diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala index f24d6da72437e..c8da3fe8bbfd9 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala @@ -7,16 +7,12 @@ package org.apache.spark.repl -import org.apache.spark.annotation.DeveloperApi - -import scala.tools.nsc._ +import scala.collection.mutable.ListBuffer +import scala.tools.nsc.interpreter.Completion._ import scala.tools.nsc.interpreter._ -import scala.tools.jline._ -import scala.tools.jline.console.completer._ -import Completion._ -import scala.collection.mutable.ListBuffer import org.apache.spark.Logging +import org.apache.spark.annotation.DeveloperApi /** * Represents an auto-completion tool for the supplied interpreter that @@ -33,10 +29,10 @@ class SparkJLineCompletion(val intp: SparkIMain) extends Completion with Complet val global: intp.global.type = intp.global import global._ - import definitions.{ PredefModule, AnyClass, AnyRefClass, ScalaPackage, JavaLangPackage } - import rootMirror.{ RootClass, getModuleIfDefined } + import definitions.{AnyClass, AnyRefClass, JavaLangPackage, PredefModule, ScalaPackage} + import rootMirror.{RootClass, getModuleIfDefined} type ExecResult = Any - import intp.{ debugging } + import intp.debugging /** * Represents the level of verbosity. Increments with consecutive tabs. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index d794f034f5578..fb2346c1b1831 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import scala.util.hashing.MurmurHash3 import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.sql.types.{StructType, DateUtils} +import org.apache.spark.sql.types.StructType object Row { /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 8bfd0471d9c7a..1a323bd48be43 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst import java.sql.Timestamp -import org.apache.spark.util.Utils +import org.apache.spark.util.{Utils, DateUtils} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.types._ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 31f1a5fdc7e53..581b2f1ecdb30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -21,8 +21,8 @@ import java.sql.{Date, Timestamp} import java.text.{DateFormat, SimpleDateFormat} import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.types._ +import org.apache.spark.util.DateUtils /** Cast the child expression to the target data type. */ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with Logging { @@ -113,7 +113,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w // UDFToString private[this] def castToString(from: DataType): Any => Any = from match { case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8")) - case DateType => buildCast[Int](_, d => DateUtils.toString(d)) + case DateType => buildCast[Int](_, d => + Cast.threadLocalDateFormat.get.format(DateUtils.toJavaDate(d))) case TimestampType => buildCast[Timestamp](_, timestampToString) case _ => buildCast[Any](_, _.toString) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index d1abf3c0b64a5..5319ad00b65ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -247,7 +247,11 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin """.children case Cast(child @ DateType(), StringType) => - child.castOrNull(c => q"org.apache.spark.sql.types.DateUtils.toString($c)", StringType) + child.castOrNull(c => + q"""org.apache.spark.sql.catalyst.expressions.Cast.threadLocalDateFormat.get.format( + org.apache.spark.util.DateUtils.toJavaDate($c.toInt)) + """, + StringType) case Cast(child @ NumericType(), IntegerType) => child.castOrNull(c => q"$c.toInt", IntegerType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 0e2d593e94124..2ff9f2bfb3955 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.{Date, Timestamp} import org.apache.spark.sql.types._ +import org.apache.spark.util.DateUtils object Literal { def apply(v: Any): Literal = v match { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala index 3dbefa40d2808..997dfdd6be044 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.Matchers._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.analysis.UnresolvedGetField import org.apache.spark.sql.types._ - +import org.apache.spark.util.DateUtils class ExpressionEvaluationBaseSuite extends FunSuite { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 5b308d88d4cdf..8d1a3192c522d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types._ import org.apache.spark.{Accumulator, Logging => SparkLogging} +import org.apache.spark.util.DateUtils /** * A serialized version of a Python lambda function. Suitable for use in a [[PythonRDD]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 0b770f2251943..cf15223177c5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.types._ import org.apache.spark.Logging +import org.apache.spark.util.DateUtils private[sql] object JsonRDD extends Logging { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 706c966ee05f5..60fd25b1c9980 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -22,7 +22,6 @@ import java.sql.{Date, Timestamp} import org.scalactic.Tolerance._ import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.functions._ import org.apache.spark.sql.json.JsonRDD.{compatibleType, enforceCorrectType} import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.test.TestSQLContext @@ -30,7 +29,7 @@ import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.sql.test.TestSQLContext.implicits._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{QueryTest, Row, SQLConf} -import org.apache.spark.util.Utils +import org.apache.spark.util.{DateUtils, Utils} class JsonSuite extends QueryTest { import org.apache.spark.sql.json.TestJsonData._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 203bc79f153dd..9536a31a828f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.sql.test.TestSQLContext.implicits._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode} +import org.apache.spark.util.DateUtils // Write support class for nested groups: ParquetWriter initializes GroupWriteSupport // with an empty configuration (it is after all not intended to be used in this way?) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 921c6194c7b76..ae62eff03c53a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.{io => hadoopIo} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types import org.apache.spark.sql.types._ +import org.apache.spark.util.DateUtils /* Implicit conversions */ import scala.collection.JavaConversions._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 3563472c7ae81..17266021ee1b8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -34,7 +34,7 @@ import org.apache.spark.SerializableWritable import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.DateUtils +import org.apache.spark.util.DateUtils /** * A trait for subclasses that handle table scans. From 5f7a010dda26049048f2b1adc316e1b855f176ae Mon Sep 17 00:00:00 2001 From: hlin09 Date: Wed, 8 Apr 2015 22:22:07 -0400 Subject: [PATCH 2/3] Fixes combineByKey. --- R/pkg/R/pairRDD.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R index c2396c32a7548..fa0b1cfe0f3a6 100644 --- a/R/pkg/R/pairRDD.R +++ b/R/pkg/R/pairRDD.R @@ -428,7 +428,7 @@ setMethod("combineByKey", pred <- function(item) exists(item$hash, keys) lapply(part, function(item) { - item$hash <- as.character(item[[1]]) + item$hash <- as.character(hashCode(item[[1]])) updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner) }) convertEnvsToList(keys, combiners) @@ -441,7 +441,7 @@ setMethod("combineByKey", pred <- function(item) exists(item$hash, keys) lapply(part, function(item) { - item$hash <- as.character(item[[1]]) + item$hash <- as.character(hashCode(item[[1]])) updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity) }) convertEnvsToList(keys, combiners) From 7fe7dcbd9084cf39b4d21ca40212b65e541209e1 Mon Sep 17 00:00:00 2001 From: hlin09 Date: Wed, 8 Apr 2015 23:47:35 -0400 Subject: [PATCH 3/3] Adds tests for combineByKey. --- R/pkg/inst/tests/test_shuffle.R | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/R/pkg/inst/tests/test_shuffle.R b/R/pkg/inst/tests/test_shuffle.R index d1da8232aea81..b58b6a1a62080 100644 --- a/R/pkg/inst/tests/test_shuffle.R +++ b/R/pkg/inst/tests/test_shuffle.R @@ -87,6 +87,18 @@ test_that("combineByKey for doubles", { expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) }) +test_that("combineByKey for characters", { + stringKeyRDD <- parallelize(sc, + list(list("max", 1L), list("min", 2L), + list("other", 3L), list("max", 4L)), 2L) + reduced <- combineByKey(stringKeyRDD, + function(x) { x }, "+", "+", 2L) + actual <- collect(reduced) + + expected <- list(list("max", 5L), list("min", 2L), list("other", 3L)) + expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) +}) + test_that("aggregateByKey", { # test aggregateByKey for int keys rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))