Skip to content

Fixes combineByKey. #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ setMethod("combineByKey",
pred <- function(item) exists(item$hash, keys)
lapply(part,
function(item) {
item$hash <- as.character(item[[1]])
item$hash <- as.character(hashCode(item[[1]]))
updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner)
})
convertEnvsToList(keys, combiners)
Expand All @@ -441,7 +441,7 @@ setMethod("combineByKey",
pred <- function(item) exists(item$hash, keys)
lapply(part,
function(item) {
item$hash <- as.character(item[[1]])
item$hash <- as.character(hashCode(item[[1]]))
updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity)
})
convertEnvsToList(keys, combiners)
Expand Down
12 changes: 12 additions & 0 deletions R/pkg/inst/tests/test_shuffle.R
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ test_that("combineByKey for doubles", {
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})

test_that("combineByKey for characters", {
stringKeyRDD <- parallelize(sc,
list(list("max", 1L), list("min", 2L),
list("other", 3L), list("max", 4L)), 2L)
reduced <- combineByKey(stringKeyRDD,
function(x) { x }, "+", "+", 2L)
actual <- collect(reduced)

expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})

test_that("aggregateByKey", {
# test aggregateByKey for int keys
rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.sql.{Date, Time}

import scala.collection.JavaConversions._

import org.apache.spark.util.DateUtils

/**
* Utility functions to serialize, deserialize objects to / from R
*/
Expand Down Expand Up @@ -101,7 +103,7 @@ private[spark] object SerDe {

def readDate(in: DataInputStream): Date = {
val d = in.readInt()
new Date(d.toLong * 24 * 3600 * 1000)
DateUtils.toJavaDate(d)
}

def readTime(in: DataInputStream): Time = {
Expand Down Expand Up @@ -277,7 +279,8 @@ private[spark] object SerDe {
}

def writeDate(out: DataOutputStream, value: Date): Unit = {
out.writeInt((value.getTime / 1000 / 3600 / 24).toInt)
val d = DateUtils.fromJavaDate(value)
out.writeInt(d)
}

def writeTime(out: DataOutputStream, value: Time): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
* limitations under the License.
*/

package org.apache.spark.sql.types
package org.apache.spark.util

import java.sql.Date
import java.util.{Calendar, TimeZone}

import org.apache.spark.sql.catalyst.expressions.Cast

/**
* helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
*/
Expand Down Expand Up @@ -55,6 +53,4 @@ object DateUtils {
def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
}

def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@

package org.apache.spark.repl

import org.apache.spark.annotation.DeveloperApi

import scala.tools.nsc._
import scala.collection.mutable.ListBuffer
import scala.tools.nsc.interpreter.Completion._
import scala.tools.nsc.interpreter._

import scala.tools.jline._
import scala.tools.jline.console.completer._
import Completion._
import scala.collection.mutable.ListBuffer
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi

/**
* Represents an auto-completion tool for the supplied interpreter that
Expand All @@ -33,10 +29,10 @@ class SparkJLineCompletion(val intp: SparkIMain) extends Completion with Complet
val global: intp.global.type = intp.global

import global._
import definitions.{ PredefModule, AnyClass, AnyRefClass, ScalaPackage, JavaLangPackage }
import rootMirror.{ RootClass, getModuleIfDefined }
import definitions.{AnyClass, AnyRefClass, JavaLangPackage, PredefModule, ScalaPackage}
import rootMirror.{RootClass, getModuleIfDefined}
type ExecResult = Any
import intp.{ debugging }
import intp.debugging

/**
* Represents the level of verbosity. Increments with consecutive tabs.
Expand Down
2 changes: 1 addition & 1 deletion sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql
import scala.util.hashing.MurmurHash3

import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types.{StructType, DateUtils}
import org.apache.spark.sql.types.StructType

object Row {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst

import java.sql.Timestamp

import org.apache.spark.util.Utils
import org.apache.spark.util.{Utils, DateUtils}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.types._
import org.apache.spark.util.DateUtils

/** Cast the child expression to the target data type. */
case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with Logging {
Expand Down Expand Up @@ -113,7 +113,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8"))
case DateType => buildCast[Int](_, d => DateUtils.toString(d))
case DateType => buildCast[Int](_, d =>
Cast.threadLocalDateFormat.get.format(DateUtils.toJavaDate(d)))
case TimestampType => buildCast[Timestamp](_, timestampToString)
case _ => buildCast[Any](_, _.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,11 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
""".children

case Cast(child @ DateType(), StringType) =>
child.castOrNull(c => q"org.apache.spark.sql.types.DateUtils.toString($c)", StringType)
child.castOrNull(c =>
q"""org.apache.spark.sql.catalyst.expressions.Cast.threadLocalDateFormat.get.format(
org.apache.spark.util.DateUtils.toJavaDate($c.toInt))
""",
StringType)

case Cast(child @ NumericType(), IntegerType) =>
child.castOrNull(c => q"$c.toInt", IntegerType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Date, Timestamp}

import org.apache.spark.sql.types._
import org.apache.spark.util.DateUtils

object Literal {
def apply(v: Any): Literal = v match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.scalatest.Matchers._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.analysis.UnresolvedGetField
import org.apache.spark.sql.types._

import org.apache.spark.util.DateUtils

class ExpressionEvaluationBaseSuite extends FunSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types._
import org.apache.spark.{Accumulator, Logging => SparkLogging}
import org.apache.spark.util.DateUtils

/**
* A serialized version of a Python lambda function. Suitable for use in a [[PythonRDD]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types._
import org.apache.spark.Logging
import org.apache.spark.util.DateUtils

private[sql] object JsonRDD extends Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import java.sql.{Date, Timestamp}
import org.scalactic.Tolerance._

import org.apache.spark.sql.TestData._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.json.JsonRDD.{compatibleType, enforceCorrectType}
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
import org.apache.spark.util.Utils
import org.apache.spark.util.{DateUtils, Utils}

class JsonSuite extends QueryTest {
import org.apache.spark.sql.json.TestJsonData._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode}
import org.apache.spark.util.DateUtils

// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
// with an empty configuration (it is after all not intended to be used in this way?)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.{io => hadoopIo}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types
import org.apache.spark.sql.types._
import org.apache.spark.util.DateUtils

/* Implicit conversions */
import scala.collection.JavaConversions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.SerializableWritable
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.DateUtils
import org.apache.spark.util.DateUtils

/**
* A trait for subclasses that handle table scans.
Expand Down