Skip to content

[SPARK-5323][SQL] Remove Row's Seq inheritance. #4115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 71 additions & 4 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import scala.util.hashing.MurmurHash3

import org.apache.spark.sql.catalyst.expressions.GenericRow


Expand All @@ -32,7 +34,7 @@ object Row {
* }
* }}}
*/
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row.toSeq)

/**
* This method can be used to construct a [[Row]] with the given values.
Expand All @@ -43,6 +45,16 @@ object Row {
* This method can be used to construct a [[Row]] from a [[Seq]] of values.
*/
def fromSeq(values: Seq[Any]): Row = new GenericRow(values.toArray)

def fromTuple(tuple: Product): Row = fromSeq(tuple.productIterator.toSeq)

/**
* Merge multiple rows into a single row, one after another.
*/
def merge(rows: Row*): Row = {
// TODO: Improve the performance of this if used in performance critical part.
new GenericRow(rows.flatMap(_.toSeq).toArray)
}
}


Expand Down Expand Up @@ -103,7 +115,13 @@ object Row {
*
* @group row
*/
trait Row extends Seq[Any] with Serializable {
trait Row extends Serializable {
/** Number of elements in the Row. */
def size: Int = length

/** Number of elements in the Row. */
def length: Int

/**
* Returns the value at position i. If the value is null, null is returned. The following
* is a mapping between Spark SQL types and return types:
Expand Down Expand Up @@ -291,12 +309,61 @@ trait Row extends Seq[Any] with Serializable {

/** Returns true if there are any NULL values in this row. */
def anyNull: Boolean = {
val l = length
val len = length
var i = 0
while (i < l) {
while (i < len) {
if (isNullAt(i)) { return true }
i += 1
}
false
}

override def equals(that: Any): Boolean = that match {
case null => false
case that: Row =>
if (this.length != that.length) {
return false
}
var i = 0
val len = this.length
while (i < len) {
if (apply(i) != that.apply(i)) {
return false
}
i += 1
}
true
case _ => false
}

override def hashCode: Int = {
// Using Scala's Seq hash code implementation.
var n = 0
var h = MurmurHash3.seqSeed
val len = length
while (n < len) {
h = MurmurHash3.mix(h, apply(n).##)
n += 1
}
MurmurHash3.finalizeHash(h, n)
}

/* ---------------------- utility methods for Scala ---------------------- */

/**
* Return a Scala Seq representing the row. ELements are placed in the same order in the Seq.
*/
def toSeq: Seq[Any]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to have a toArray here, since most Row subclasses are backed by array, which already has plenty rich operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not that bad in that case since it just returns a wrappedarray. we can do it in a separate pr if we think it is important.


/** Displays all elements of this sequence in a string (without a separator). */
def mkString: String = toSeq.mkString

/** Displays all elements of this sequence in a string using a separator string. */
def mkString(sep: String): String = toSeq.mkString(sep)

/**
* Displays all elements of this traversable or iterator in a string using
* start, end, and separator strings.
*/
def mkString(start: String, sep: String, end: String): String = toSeq.mkString(start, sep, end)
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ trait ScalaReflection {
}

def convertRowToScala(r: Row, schema: StructType): Row = {
// TODO: This is very slow!!!
new GenericRow(
r.zip(schema.fields.map(_.dataType))
r.toSeq.zip(schema.fields.map(_.dataType))
.map(r_dt => convertToScala(r_dt._1, r_dt._2)).toArray)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,6 @@ package object dsl {
def sfilter[T1](arg1: Symbol)(udf: (T1) => Boolean) =
Filter(ScalaUdf(udf, BooleanType, Seq(UnresolvedAttribute(arg1.name))), logicalPlan)

def sfilter(dynamicUdf: (DynamicRow) => Boolean) =
Filter(ScalaUdf(dynamicUdf, BooleanType, Seq(WrapDynamic(logicalPlan.output))), logicalPlan)

def sample(
fraction: Double,
withReplacement: Boolean = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
val casts = from.fields.zip(to.fields).map {
case (fromField, toField) => cast(fromField.dataType, toField.dataType)
}
buildCast[Row](_, row => Row(row.zip(casts).map {
// TODO: This is very slow!
buildCast[Row](_, row => Row(row.toSeq.zip(casts).map {
case (v, cast) => if (v == null) null else cast(v)
}: _*))
}
Expand Down
Loading