Skip to content

Commit d181c2a

Browse files
rxinmarmbrus
authored andcommitted
[SPARK-5323][SQL] Remove Row's Seq inheritance.
Author: Reynold Xin <[email protected]> Closes #4115 from rxin/row-seq and squashes the following commits: e33abd8 [Reynold Xin] Fixed compilation error. cceb650 [Reynold Xin] Python test fixes, and removal of WrapDynamic. 0334a52 [Reynold Xin] mkString. 9cdeb7d [Reynold Xin] Hive tests. 15681c2 [Reynold Xin] Fix more test cases. ea9023a [Reynold Xin] Fixed a catalyst test. c5e2cb5 [Reynold Xin] Minor patch up. b9cab7c [Reynold Xin] [SPARK-5323][SQL] Remove Row's Seq inheritance.
1 parent bc20a52 commit d181c2a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1018
-956
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import scala.util.hashing.MurmurHash3
21+
2022
import org.apache.spark.sql.catalyst.expressions.GenericRow
2123

2224

@@ -32,7 +34,7 @@ object Row {
3234
* }
3335
* }}}
3436
*/
35-
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
37+
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row.toSeq)
3638

3739
/**
3840
* This method can be used to construct a [[Row]] with the given values.
@@ -43,6 +45,16 @@ object Row {
4345
* This method can be used to construct a [[Row]] from a [[Seq]] of values.
4446
*/
4547
def fromSeq(values: Seq[Any]): Row = new GenericRow(values.toArray)
48+
49+
def fromTuple(tuple: Product): Row = fromSeq(tuple.productIterator.toSeq)
50+
51+
/**
52+
* Merge multiple rows into a single row, one after another.
53+
*/
54+
def merge(rows: Row*): Row = {
55+
// TODO: Improve the performance of this if used in performance critical part.
56+
new GenericRow(rows.flatMap(_.toSeq).toArray)
57+
}
4658
}
4759

4860

@@ -103,7 +115,13 @@ object Row {
103115
*
104116
* @group row
105117
*/
106-
trait Row extends Seq[Any] with Serializable {
118+
trait Row extends Serializable {
119+
/** Number of elements in the Row. */
120+
def size: Int = length
121+
122+
/** Number of elements in the Row. */
123+
def length: Int
124+
107125
/**
108126
* Returns the value at position i. If the value is null, null is returned. The following
109127
* is a mapping between Spark SQL types and return types:
@@ -291,12 +309,61 @@ trait Row extends Seq[Any] with Serializable {
291309

292310
/** Returns true if there are any NULL values in this row. */
293311
def anyNull: Boolean = {
294-
val l = length
312+
val len = length
295313
var i = 0
296-
while (i < l) {
314+
while (i < len) {
297315
if (isNullAt(i)) { return true }
298316
i += 1
299317
}
300318
false
301319
}
320+
321+
override def equals(that: Any): Boolean = that match {
322+
case null => false
323+
case that: Row =>
324+
if (this.length != that.length) {
325+
return false
326+
}
327+
var i = 0
328+
val len = this.length
329+
while (i < len) {
330+
if (apply(i) != that.apply(i)) {
331+
return false
332+
}
333+
i += 1
334+
}
335+
true
336+
case _ => false
337+
}
338+
339+
override def hashCode: Int = {
340+
// Using Scala's Seq hash code implementation.
341+
var n = 0
342+
var h = MurmurHash3.seqSeed
343+
val len = length
344+
while (n < len) {
345+
h = MurmurHash3.mix(h, apply(n).##)
346+
n += 1
347+
}
348+
MurmurHash3.finalizeHash(h, n)
349+
}
350+
351+
/* ---------------------- utility methods for Scala ---------------------- */
352+
353+
/**
354+
* Return a Scala Seq representing the row. ELements are placed in the same order in the Seq.
355+
*/
356+
def toSeq: Seq[Any]
357+
358+
/** Displays all elements of this sequence in a string (without a separator). */
359+
def mkString: String = toSeq.mkString
360+
361+
/** Displays all elements of this sequence in a string using a separator string. */
362+
def mkString(sep: String): String = toSeq.mkString(sep)
363+
364+
/**
365+
* Displays all elements of this traversable or iterator in a string using
366+
* start, end, and separator strings.
367+
*/
368+
def mkString(start: String, sep: String, end: String): String = toSeq.mkString(start, sep, end)
302369
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,9 @@ trait ScalaReflection {
8484
}
8585

8686
def convertRowToScala(r: Row, schema: StructType): Row = {
87+
// TODO: This is very slow!!!
8788
new GenericRow(
88-
r.zip(schema.fields.map(_.dataType))
89+
r.toSeq.zip(schema.fields.map(_.dataType))
8990
.map(r_dt => convertToScala(r_dt._1, r_dt._2)).toArray)
9091
}
9192

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -272,9 +272,6 @@ package object dsl {
272272
def sfilter[T1](arg1: Symbol)(udf: (T1) => Boolean) =
273273
Filter(ScalaUdf(udf, BooleanType, Seq(UnresolvedAttribute(arg1.name))), logicalPlan)
274274

275-
def sfilter(dynamicUdf: (DynamicRow) => Boolean) =
276-
Filter(ScalaUdf(dynamicUdf, BooleanType, Seq(WrapDynamic(logicalPlan.output))), logicalPlan)
277-
278275
def sample(
279276
fraction: Double,
280277
withReplacement: Boolean = true,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
407407
val casts = from.fields.zip(to.fields).map {
408408
case (fromField, toField) => cast(fromField.dataType, toField.dataType)
409409
}
410-
buildCast[Row](_, row => Row(row.zip(casts).map {
410+
// TODO: This is very slow!
411+
buildCast[Row](_, row => Row(row.toSeq.zip(casts).map {
411412
case (v, cast) => if (v == null) null else cast(v)
412413
}: _*))
413414
}

0 commit comments

Comments
 (0)