From acc5c48fff8de615cb6fa6d4b29bc049d2bc3cc9 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 16 Mar 2014 21:19:33 +0800 Subject: [PATCH 01/19] Added Hive test files to .gitignore Signed-off-by: Cheng Lian --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 3d178992123da..d726fa73e458a 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,4 @@ dist/ spark-*-bin.tar.gz unit-tests.log /lib/ +sql/hive/src/test/resources From 34f3c1941cebf5f045ace0c9e5dd8c8cde09650d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 16 Mar 2014 21:20:04 +0800 Subject: [PATCH 02/19] Added TypeTag field to all NativeTypes Signed-off-by: Cheng Lian --- .../spark/sql/catalyst/types/dataTypes.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 6eb2b62eccb48..90a9f9f7e548d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql package catalyst package types -import expressions.Expression +import scala.reflect.runtime.universe.{typeTag, TypeTag} + +import org.apache.spark.sql.catalyst.expressions.Expression abstract class DataType { /** Matches any expression that evaluates to this DataType */ @@ -33,11 +35,13 @@ case object NullType extends DataType abstract class NativeType extends DataType { type JvmType + @transient val tag: TypeTag[JvmType] val ordering: Ordering[JvmType] } case object StringType extends NativeType { type JvmType = String + @transient lazy val tag = typeTag[JvmType] val ordering = implicitly[Ordering[JvmType]] } case object BinaryType extends DataType { @@ -45,6 +49,7 @@ case object BinaryType extends DataType { } case object BooleanType extends NativeType { type JvmType = Boolean + @transient lazy val tag = typeTag[JvmType] val ordering = implicitly[Ordering[JvmType]] } @@ -71,6 +76,7 @@ abstract class IntegralType extends NumericType { case object LongType extends IntegralType { type JvmType = Long + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[Long]] val integral = implicitly[Integral[Long]] val ordering = implicitly[Ordering[JvmType]] @@ -78,6 +84,7 @@ case object LongType extends IntegralType { case object IntegerType extends IntegralType { type JvmType = Int + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[Int]] val integral = implicitly[Integral[Int]] val ordering = implicitly[Ordering[JvmType]] @@ -85,6 +92,7 @@ case object IntegerType extends IntegralType { case object ShortType extends IntegralType { type JvmType = Short + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[Short]] val integral = implicitly[Integral[Short]] val ordering = implicitly[Ordering[JvmType]] @@ -92,6 +100,7 @@ case object ShortType extends IntegralType { case object ByteType extends IntegralType { type JvmType = Byte + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[Byte]] val integral = implicitly[Integral[Byte]] val ordering = implicitly[Ordering[JvmType]] @@ -110,6 +119,7 @@ abstract class FractionalType extends NumericType { case object DecimalType extends FractionalType { type JvmType = BigDecimal + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[BigDecimal]] val fractional = implicitly[Fractional[BigDecimal]] val ordering = implicitly[Ordering[JvmType]] @@ -117,6 +127,7 @@ case object DecimalType extends FractionalType { case object DoubleType extends FractionalType { type JvmType = Double + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[Double]] val fractional = implicitly[Fractional[Double]] val ordering = implicitly[Ordering[JvmType]] @@ -124,6 +135,7 @@ case object DoubleType extends FractionalType { case object FloatType extends FractionalType { type JvmType = Float + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[Float]] val fractional = implicitly[Fractional[Float]] val ordering = implicitly[Ordering[JvmType]] From 2d09066f6aac239e488a73363d70e7126a91fec9 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 16 Mar 2014 23:56:09 +0800 Subject: [PATCH 03/19] Added KryoSerializer Signed-off-by: Cheng Lian --- .../apache/spark/sql/execution/Exchange.scala | 4 ++-- .../spark/sql/execution/KryoSerializer.scala | 22 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/KryoSerializer.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 72dc5ec6ad33c..6c13c537cd14e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -25,7 +25,7 @@ import com.esotericsoftware.kryo.io.{Output, Input} import org.apache.spark.{SparkConf, RangePartitioner, HashPartitioner} import org.apache.spark.rdd.ShuffledRDD -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.{KryoSerializer => SparkKryoSerializer} import org.apache.spark.util.MutablePair import catalyst.rules.Rule @@ -33,7 +33,7 @@ import catalyst.errors._ import catalyst.expressions._ import catalyst.plans.physical._ -private class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { +private class SparkSqlSerializer(conf: SparkConf) extends SparkKryoSerializer(conf) { override def newKryo(): Kryo = { val kryo = new Kryo kryo.setRegistrationRequired(true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/KryoSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/KryoSerializer.scala new file mode 100644 index 0000000000000..fdc9c3872798f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/KryoSerializer.scala @@ -0,0 +1,22 @@ +package org.apache.spark.sql +package execution + +import java.nio.ByteBuffer + +import org.apache.spark.serializer.{KryoSerializer => SparkKryoSerializer} +import org.apache.spark.{SparkConf, SparkEnv} + +object KryoSerializer { + @transient lazy val ser: SparkKryoSerializer = { + val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) + new SparkKryoSerializer(sparkConf) + } + + def serialize[T](o: T): Array[Byte] = { + ser.newInstance().serialize(o).array() + } + + def deserialize[T](bytes: Array[Byte]): T = { + ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes)) + } +} From f18ddc647306c628fdca59136d241c1fe8a1c85e Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 16 Mar 2014 23:56:35 +0800 Subject: [PATCH 04/19] Added ColumnTypes and test suite Signed-off-by: Cheng Lian --- .../spark/sql/columnar/ColumnType.scala | 172 +++++++++++++++ .../spark/sql/columnar/ColumnTypeSuite.scala | 196 ++++++++++++++++++ 2 files changed, 368 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala new file mode 100644 index 0000000000000..bddb64f6fc363 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -0,0 +1,172 @@ +package org.apache.spark.sql +package columnar + +import java.nio.ByteBuffer + +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.execution.KryoSerializer + +/** + * An abstract class that represents type of a column. Used to append/extract Java objects into/from + * the underlying [[ByteBuffer]] of a column. + * + * @param typeId A unique ID representing the type. + * @param defaultSize Default size in bytes for one element of type T (e.g. 4 for `Int`). + * @tparam T Scala data type for the column. + * @tparam JvmType Underlying Java type to represent the elements. + */ +sealed abstract class ColumnType[T <: DataType, JvmType]( + val typeId: Int, + val defaultSize: Int) { + + /** + * Extracts a value out of the buffer at the buffer's current position. + */ + def extract(buffer: ByteBuffer): JvmType + + /** + * Appends the given value v of type T into the given ByteBuffer. + */ + def append(v: JvmType, buffer: ByteBuffer) + + /** + * Returns the size of the value. This is used to calculate the size of variable length types + * such as byte arrays and strings. + */ + def actualSize(v: JvmType): Int = defaultSize + + /** + * Creates a duplicated copy of the value. + */ + def clone(v: JvmType): JvmType = v +} + +private[columnar] abstract class NativeColumnType[T <: NativeType]( + val dataType: T, + typeId: Int, + defaultSize: Int) + extends ColumnType[T, T#JvmType](typeId, defaultSize) { + + /** + * Scala TypeTag. Can be used to create primitive arrays and hash tables. + */ + def scalaTag = dataType.tag +} + +object INT extends NativeColumnType(IntegerType, 0, 4) { + def append(v: Int, buffer: ByteBuffer) { + buffer.putInt(v) + } + + def extract(buffer: ByteBuffer) = { + buffer.getInt() + } +} + +object LONG extends NativeColumnType(LongType, 1, 8) { + override def append(v: Long, buffer: ByteBuffer) { + buffer.putLong(v) + } + + override def extract(buffer: ByteBuffer) = { + buffer.getLong() + } +} + +object FLOAT extends NativeColumnType(FloatType, 2, 4) { + override def append(v: Float, buffer: ByteBuffer) { + buffer.putFloat(v) + } + + override def extract(buffer: ByteBuffer) = { + buffer.getFloat() + } +} + +object DOUBLE extends NativeColumnType(DoubleType, 3, 8) { + override def append(v: Double, buffer: ByteBuffer) { + buffer.putDouble(v) + } + + override def extract(buffer: ByteBuffer) = { + buffer.getDouble() + } +} + +object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) { + override def append(v: Boolean, buffer: ByteBuffer) { + buffer.put(if (v) 1.toByte else 0.toByte) + } + + override def extract(buffer: ByteBuffer) = { + if (buffer.get() == 1) true else false + } +} + +object BYTE extends NativeColumnType(ByteType, 5, 1) { + override def append(v: Byte, buffer: ByteBuffer) { + buffer.put(v) + } + + override def extract(buffer: ByteBuffer) = { + buffer.get() + } +} + +object SHORT extends NativeColumnType(ShortType, 6, 2) { + override def append(v: Short, buffer: ByteBuffer) { + buffer.putShort(v) + } + + override def extract(buffer: ByteBuffer) = { + buffer.getShort() + } +} + +object STRING extends NativeColumnType(StringType, 7, 8) { + override def actualSize(v: String): Int = v.getBytes.length + 4 + + override def append(v: String, buffer: ByteBuffer) { + val stringBytes = v.getBytes() + buffer.putInt(stringBytes.length).put(stringBytes, 0, stringBytes.length) + } + + override def extract(buffer: ByteBuffer) = { + val length = buffer.getInt() + val stringBytes = new Array[Byte](length) + buffer.get(stringBytes, 0, length) + new String(stringBytes) + } +} + +object BINARY extends ColumnType[BinaryType.type, Array[Byte]](8, 16) { + override def actualSize(v: Array[Byte]) = v.length + 4 + + override def append(v: Array[Byte], buffer: ByteBuffer) { + buffer.putInt(v.length).put(v, 0, v.length) + } + + override def extract(buffer: ByteBuffer) = { + val length = buffer.getInt() + val bytes = new Array[Byte](length) + buffer.get(bytes, 0, length) + bytes + } +} + +object GENERIC extends ColumnType[DataType, Any](9, 16) { + // TODO (lian) Can we avoid serialization here? + override def actualSize(v: Any) = KryoSerializer.serialize(v).size + + override def append(v: Any, buffer: ByteBuffer) { + val serialized = KryoSerializer.serialize(v) + buffer.putInt(serialized.length).put(serialized) + } + + override def extract(buffer: ByteBuffer) = { + val length = buffer.getInt() + val serialized = new Array[Byte](length) + buffer.get(serialized, 0, length) + KryoSerializer.deserialize[Any](serialized) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala new file mode 100644 index 0000000000000..a8311f943f044 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala @@ -0,0 +1,196 @@ +package org.apache.spark.sql +package columnar + +import java.nio.ByteBuffer + +import org.scalatest.FunSuite +import scala.util.Random +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.execution.KryoSerializer + +class ColumnTypeSuite extends FunSuite { + val columnTypes = Seq(INT, SHORT, LONG, BYTE, DOUBLE, FLOAT, STRING) + + test("defaultSize") { + val defaultSize = Seq(4, 2, 8, 1, 8, 4, 8) + + columnTypes.zip(defaultSize).foreach { case (columnType, size) => + assert(columnType.defaultSize === size) + } + } + + test("actualSize") { + val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5) + val actualSizes = Seq( + INT.actualSize(Int.MaxValue), + SHORT.actualSize(Short.MaxValue), + LONG.actualSize(Long.MaxValue), + BYTE.actualSize(Byte.MaxValue), + DOUBLE.actualSize(Double.MaxValue), + FLOAT.actualSize(Float.MaxValue), + STRING.actualSize("hello")) + + expectedSizes.zip(actualSizes).foreach { case (expected, actual) => + assert(expected === actual) + } + } + + testNumericColumnType[BooleanType.type, Boolean]( + BOOLEAN, + Array.fill(4)(Random.nextBoolean()), + ByteBuffer.allocate(32), + (buffer: ByteBuffer, v: Boolean) => { + buffer.put((if (v) 1 else 0).toByte) + }, + (buffer: ByteBuffer) => { + buffer.get() == 1 + }) + + testNumericColumnType[IntegerType.type, Int]( + INT, + Array.fill(4)(Random.nextInt()), + ByteBuffer.allocate(32), + (_: ByteBuffer).putInt(_), + (_: ByteBuffer).getInt) + + testNumericColumnType[ShortType.type, Short]( + SHORT, + Array.fill(4)(Random.nextInt(Short.MaxValue).asInstanceOf[Short]), + ByteBuffer.allocate(32), + (_: ByteBuffer).putShort(_), + (_: ByteBuffer).getShort) + + testNumericColumnType[LongType.type, Long]( + LONG, + Array.fill(4)(Random.nextLong()), + ByteBuffer.allocate(64), + (_: ByteBuffer).putLong(_), + (_: ByteBuffer).getLong) + + testNumericColumnType[ByteType.type, Byte]( + BYTE, + Array.fill(4)(Random.nextInt(Byte.MaxValue).asInstanceOf[Byte]), + ByteBuffer.allocate(64), + (_: ByteBuffer).put(_), + (_: ByteBuffer).get) + + testNumericColumnType[DoubleType.type, Double]( + DOUBLE, + Array.fill(4)(Random.nextDouble()), + ByteBuffer.allocate(64), + (_: ByteBuffer).putDouble(_), + (_: ByteBuffer).getDouble) + + testNumericColumnType[FloatType.type, Float]( + FLOAT, + Array.fill(4)(Random.nextFloat()), + ByteBuffer.allocate(64), + (_: ByteBuffer).putFloat(_), + (_: ByteBuffer).getFloat) + + test("STRING") { + val buffer = ByteBuffer.allocate(128) + val seq = Array("hello", "world", "spark", "sql") + + seq.map(_.getBytes).foreach { bytes: Array[Byte] => + buffer.putInt(bytes.length).put(bytes) + } + + buffer.rewind() + seq.foreach { s => + assert(s === STRING.extract(buffer)) + } + + buffer.rewind() + seq.foreach(STRING.append(_, buffer)) + + buffer.rewind() + seq.foreach { s => + val length = buffer.getInt + assert(length === s.getBytes.length) + + val bytes = new Array[Byte](length) + buffer.get(bytes, 0, length) + assert(s === new String(bytes)) + } + } + + test("BINARY") { + val buffer = ByteBuffer.allocate(128) + val seq = Array.fill(4) { + val bytes = new Array[Byte](4) + Random.nextBytes(bytes) + bytes + } + + seq.foreach { bytes => + buffer.putInt(bytes.length).put(bytes) + } + + buffer.rewind() + seq.foreach { b => + assert(b === BINARY.extract(buffer)) + } + + buffer.rewind() + seq.foreach(BINARY.append(_, buffer)) + + buffer.rewind() + seq.foreach { b => + val length = buffer.getInt + assert(length === b.length) + + val bytes = new Array[Byte](length) + buffer.get(bytes, 0, length) + assert(b === bytes) + } + } + + test("GENERIC") { + val buffer = ByteBuffer.allocate(512) + val obj = Map(1 -> "spark", 2 -> "sql") + val serializedObj = KryoSerializer.serialize(obj) + + GENERIC.append(obj, buffer) + buffer.rewind() + + val length = buffer.getInt() + assert(length === serializedObj.length) + + val bytes = new Array[Byte](length) + buffer.get(bytes, 0, length) + assert(obj === KryoSerializer.deserialize(bytes)) + + buffer.rewind() + buffer.putInt(serializedObj.length).put(serializedObj) + + buffer.rewind() + assert(obj === GENERIC.extract(buffer)) + } + + def testNumericColumnType[T <: DataType, JvmType]( + columnType: ColumnType[T, JvmType], + seq: Seq[JvmType], + buffer: ByteBuffer, + putter: (ByteBuffer, JvmType) => Unit, + getter: (ByteBuffer) => JvmType) { + + test(columnType.getClass.getSimpleName.stripSuffix("$")) { + buffer.rewind() + seq.foreach(putter(buffer, _)) + + buffer.rewind() + seq.foreach { i => + assert(i === columnType.extract(buffer)) + } + + buffer.rewind() + seq.foreach(columnType.append(_, buffer)) + + buffer.rewind() + seq.foreach { i => + assert(i === getter(buffer)) + } + } + } +} From c01a17785170b0ec2e84d4c9bfa6dbd91084ff7f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 17 Mar 2014 00:16:43 +0800 Subject: [PATCH 05/19] Added column builder classes and test suite Signed-off-by: Cheng Lian --- .../spark/sql/columnar/ColumnBuilder.scala | 148 ++++++++++++++++++ .../spark/sql/columnar/ColumnType.scala | 2 +- .../sql/columnar/NullableColumnBuilder.scala | 57 +++++++ .../spark/sql/columnar/ColumnTypeSuite.scala | 4 +- .../columnar/NullableColumnBuilderSuite.scala | 109 +++++++++++++ 5 files changed, 318 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala new file mode 100644 index 0000000000000..505a7a3c20b2f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -0,0 +1,148 @@ +package org.apache.spark.sql +package columnar + +import java.nio.{ByteOrder, ByteBuffer} + +import org.apache.spark.sql.catalyst.types._ + +trait ColumnBuilder { + /** + * Initializes with an approximate lower bound on the expected number of elements in this column. + */ + def initialize(initialSize: Int, columnName: String = "") + + def append(row: Row, ordinal: Int) + + def build(): ByteBuffer +} + +object ColumnBuilder { + val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104 + + def ensureFreeSpace(orig: ByteBuffer, size: Int) = { + if (orig.remaining >= size) { + orig + } else { + // grow in steps of initial size + val capacity = orig.capacity() + val newSize = capacity + size.max(capacity / 8 + 1) + val pos = orig.position() + + orig.clear() + ByteBuffer + .allocate(newSize) + .order(ByteOrder.nativeOrder()) + .put(orig.array(), 0, pos) + } + } +} + +abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder { + import ColumnBuilder._ + + private var _columnName: String = _ + private var _buffer: ByteBuffer = _ + + def columnType: ColumnType[T, JvmType] + + override def initialize(initialSize: Int, columnName: String = "") = { + val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize + _columnName = columnName + _buffer = ByteBuffer.allocate(4 + 4 + size * columnType.defaultSize) + _buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId) + } + + // Have to give a concrete implementation to make mixin possible + override def append(row: Row, ordinal: Int) { + doAppend(row, ordinal) + } + + // Concrete `ColumnBuilder`s can override this method to append values + protected def doAppend(row: Row, ordinal: Int) + + // Helper method to append primitive values (to avoid boxing cost) + protected def appendValue(v: JvmType) { + _buffer = ensureFreeSpace(_buffer, columnType.actualSize(v)) + columnType.append(v, _buffer) + } + + override def build() = { + _buffer.limit(_buffer.position()).rewind() + _buffer + } +} + +abstract class NativeColumnBuilder[T <: NativeType](val columnType: NativeColumnType[T]) + extends BasicColumnBuilder[T, T#JvmType] + with NullableColumnBuilder[T, T#JvmType] + +class BooleanColumnBuilder extends NativeColumnBuilder(BOOLEAN) { + override def doAppend(row: Row, ordinal: Int) { + appendValue(row.getBoolean(ordinal)) + } +} + +class IntColumnBuilder extends NativeColumnBuilder(INT) { + override def doAppend(row: Row, ordinal: Int) { + appendValue(row.getInt(ordinal)) + } +} + +class ShortColumnBuilder extends NativeColumnBuilder(SHORT) { + override def doAppend(row: Row, ordinal: Int) { + appendValue(row.getShort(ordinal)) + } +} + +class LongColumnBuilder extends NativeColumnBuilder(LONG) { + override def doAppend(row: Row, ordinal: Int) { + appendValue(row.getLong(ordinal)) + } +} + +class ByteColumnBuilder extends NativeColumnBuilder(BYTE) { + override def doAppend(row: Row, ordinal: Int) { + appendValue(row.getByte(ordinal)) + } +} + +class DoubleColumnBuilder extends NativeColumnBuilder(DOUBLE) { + override def doAppend(row: Row, ordinal: Int) { + appendValue(row.getDouble(ordinal)) + } +} + +class FloatColumnBuilder extends NativeColumnBuilder(FLOAT) { + override def doAppend(row: Row, ordinal: Int) { + appendValue(row.getFloat(ordinal)) + } +} + +class StringColumnBuilder extends NativeColumnBuilder(STRING) { + override def doAppend(row: Row, ordinal: Int) { + appendValue(row.getString(ordinal)) + } +} + +class BinaryColumnBuilder + extends BasicColumnBuilder[BinaryType.type, Array[Byte]] + with NullableColumnBuilder[BinaryType.type, Array[Byte]] { + + val columnType = BINARY + + override def doAppend(row: Row, ordinal: Int) { + appendValue(row(ordinal).asInstanceOf[Array[Byte]]) + } +} + +// TODO (lian) Add support for array, struct and map +class GenericColumnBuilder + extends BasicColumnBuilder[DataType, Any] + with NullableColumnBuilder[DataType, Any] { + + val columnType = GENERIC + + override def doAppend(row: Row, ordinal: Int) { + appendValue(row(ordinal)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index bddb64f6fc363..4c85f1c261dd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -155,7 +155,7 @@ object BINARY extends ColumnType[BinaryType.type, Array[Byte]](8, 16) { } object GENERIC extends ColumnType[DataType, Any](9, 16) { - // TODO (lian) Can we avoid serialization here? + // TODO (lian) Must avoid duplicated serialization here. override def actualSize(v: Any) = KryoSerializer.serialize(v).size override def append(v: Any, buffer: ByteBuffer) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala new file mode 100644 index 0000000000000..1a9a921222e03 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala @@ -0,0 +1,57 @@ +package org.apache.spark.sql +package columnar + +import java.nio.{ByteOrder, ByteBuffer} + +import org.apache.spark.sql.catalyst.types.DataType + +/** + * Builds a nullable column. The byte buffer of a nullable column contains: + * - 4 bytes for the null count (number of nulls) + * - positions for each null, in ascending order + * - the non-null data (column data type, compression type, data...) + */ +trait NullableColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder { + import ColumnBuilder._ + + private var _nulls: ByteBuffer = _ + private var _pos: Int = _ + private var _nullCount: Int = _ + + abstract override def initialize(initialSize: Int, columnName: String) { + _nulls = ByteBuffer.allocate(1024) + _nulls.order(ByteOrder.nativeOrder()) + _pos = 0 + _nullCount = 0 + super.initialize(initialSize, columnName) + } + + abstract override def append(row: Row, ordinal: Int) { + if (row.isNullAt(ordinal)) { + _nulls = ensureFreeSpace(_nulls, 4) + _nulls.putInt(_pos) + _nullCount += 1 + } else { + super.append(row, ordinal) + } + _pos += 1 + } + + abstract override def build(): ByteBuffer = { + val nonNulls = super.build() + val nullDataLen = _nulls.position() + + _nulls.limit(nullDataLen) + _nulls.rewind() + + // 4 bytes for null count + null positions + non nulls + ByteBuffer + .allocate(4 + nullDataLen + nonNulls.limit) + .order(ByteOrder.nativeOrder()) + .putInt(_nullCount) + .put(_nulls) + .put(nonNulls) + .rewind() + .asInstanceOf[ByteBuffer] + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala index a8311f943f044..22c6fc5fb30fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala @@ -3,8 +3,10 @@ package columnar import java.nio.ByteBuffer -import org.scalatest.FunSuite import scala.util.Random + +import org.scalatest.FunSuite + import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution.KryoSerializer diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala new file mode 100644 index 0000000000000..ff28fc3d62915 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala @@ -0,0 +1,109 @@ +package org.apache.spark.sql +package columnar + +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.catalyst.types.DataType + +class NullableColumnBuilderSuite extends FunSuite { + testNullableColumnBuilder(new IntColumnBuilder, INT) + testNullableColumnBuilder(new LongColumnBuilder, LONG) + testNullableColumnBuilder(new ShortColumnBuilder, SHORT) + testNullableColumnBuilder(new BooleanColumnBuilder, BOOLEAN) + testNullableColumnBuilder(new ByteColumnBuilder, BYTE) + testNullableColumnBuilder(new StringColumnBuilder, STRING) + testNullableColumnBuilder(new DoubleColumnBuilder, DOUBLE) + testNullableColumnBuilder(new FloatColumnBuilder, FLOAT) + testNullableColumnBuilder(new BinaryColumnBuilder, BINARY) + testNullableColumnBuilder(new GenericColumnBuilder, GENERIC) + + val nonNullTestRow = { + val row = new GenericMutableRow(10) + + row(INT.typeId) = Random.nextInt() + row(LONG.typeId) = Random.nextLong() + row(FLOAT.typeId) = Random.nextFloat() + row(DOUBLE.typeId) = Random.nextDouble() + row(BOOLEAN.typeId) = Random.nextBoolean() + row(BYTE.typeId) = Random.nextInt(Byte.MaxValue).asInstanceOf[Byte] + row(SHORT.typeId) = Random.nextInt(Short.MaxValue).asInstanceOf[Short] + row(STRING.typeId) = Random.nextString(4) + row(BINARY.typeId) = { + val bytes = new Array[Byte](4) + Random.nextBytes(bytes) + bytes + } + row(GENERIC.typeId) = Map(Random.nextInt() -> Random.nextString(4)) + + row + } + + val nullTestRow = { + val row = new GenericMutableRow(10) + (0 until 10).foreach(row.setNullAt) + row + } + + def testNullableColumnBuilder[T <: DataType, JvmType]( + columnBuilder: ColumnBuilder, + columnType: ColumnType[T, JvmType]) { + + val columnBuilderName = columnBuilder.getClass.getSimpleName + + test(s"$columnBuilderName: empty column") { + columnBuilder.initialize(4) + + val buffer = columnBuilder.build() + + // For null count + assert(buffer.getInt === 0) + // For column type ID + assert(buffer.getInt() === columnType.typeId) + assert(!buffer.hasRemaining) + } + + test(s"$columnBuilderName: buffer size auto growth") { + columnBuilder.initialize(4) + + (0 until 4) foreach { _ => + columnBuilder.append(nonNullTestRow, columnType.typeId) + } + + val buffer = columnBuilder.build() + + // For null count + assert(buffer.getInt() === 0) + // For column type ID + assert(buffer.getInt() === columnType.typeId) + } + + test(s"$columnBuilderName: null values") { + columnBuilder.initialize(4) + + (0 until 4) foreach { _ => + columnBuilder.append(nonNullTestRow, columnType.typeId) + columnBuilder.append(nullTestRow, columnType.typeId) + } + + val buffer = columnBuilder.build() + + // For null count + assert(buffer.getInt() === 4) + // For null positions + (1 to 7 by 2).foreach(i => assert(buffer.getInt() === i)) + + // For column type ID + assert(buffer.getInt() === columnType.typeId) + + // For non-null values + (0 until 4).foreach { _ => + assert(columnType.extract(buffer) === nonNullTestRow(columnType.typeId)) + } + + assert(!buffer.hasRemaining) + } + } +} From dbf7a38ff6d015a3c4ece4f555ebc5cf6a03ef07 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 18 Mar 2014 09:39:43 +0800 Subject: [PATCH 06/19] Added ColumnAccessor and test suite, refactored ColumnBuilder * ColumnAccessors/ColumnBuilders are now both built with stackable traits * Columnar byte buffer layout changed: column type ID was moved to the first 4 bytes (before null value information) * Generic objects are serialised/deserialised before/after being appended/extracted into/from columnar byte buffers. Signed-off-by: Cheng Lian --- .../spark/sql/columnar/ColumnAccessor.scala | 142 ++++++++++++++++++ .../spark/sql/columnar/ColumnBuilder.scala | 141 ++++++++++------- .../spark/sql/columnar/ColumnType.scala | 20 +-- .../sql/columnar/NullableColumnAccessor.scala | 40 +++++ .../sql/columnar/NullableColumnBuilder.scala | 63 ++++---- .../spark/sql/columnar/ColumnTypeSuite.scala | 20 ++- .../spark/sql/columnar/ColumnarTestData.scala | 38 +++++ .../NullableColumnAccessorSuite.scala | 51 +++++++ .../columnar/NullableColumnBuilderSuite.scala | 97 +++++------- 9 files changed, 456 insertions(+), 156 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala new file mode 100644 index 0000000000000..528a15ec38213 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala @@ -0,0 +1,142 @@ +package org.apache.spark.sql +package columnar + +import java.nio.{ByteOrder, ByteBuffer} + +import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType} +import org.apache.spark.sql.catalyst.expressions.MutableRow +import org.apache.spark.sql.execution.KryoSerializer + +/** + * An `Iterator` like trait used to extract values from columnar byte buffer. When a value is + * extracted from the buffer, instead of directly returning it, the value is set into some field of + * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods + * for primitive values provided by [[MutableRow]]. + */ +trait ColumnAccessor { + initialize() + + protected def initialize() + + def hasNext: Boolean + + def extractTo(row: MutableRow, ordinal: Int) + + protected def underlyingBuffer: ByteBuffer +} + +abstract class BasicColumnAccessor[T <: DataType, JvmType](buffer: ByteBuffer) + extends ColumnAccessor { + + protected def initialize() {} + + def columnType: ColumnType[T, JvmType] + + def hasNext = buffer.hasRemaining + + def extractTo(row: MutableRow, ordinal: Int) { + doExtractTo(row, ordinal) + } + + protected def doExtractTo(row: MutableRow, ordinal: Int) + + protected def underlyingBuffer = buffer +} + +abstract class NativeColumnAccessor[T <: NativeType]( + buffer: ByteBuffer, + val columnType: NativeColumnType[T]) + extends BasicColumnAccessor[T, T#JvmType](buffer) + with NullableColumnAccessor + +class BooleanColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, BOOLEAN) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setBoolean(ordinal, columnType.extract(buffer)) + } +} + +class IntColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, INT) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setInt(ordinal, columnType.extract(buffer)) + } +} + +class ShortColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, SHORT) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setShort(ordinal, columnType.extract(buffer)) + } +} + +class LongColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, LONG) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setLong(ordinal, columnType.extract(buffer)) + } +} + +class ByteColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, BYTE) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setByte(ordinal, columnType.extract(buffer)) + } +} + +class DoubleColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, DOUBLE) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setDouble(ordinal, columnType.extract(buffer)) + } +} + +class FloatColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, FLOAT) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setFloat(ordinal, columnType.extract(buffer)) + } +} + +class StringColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, STRING) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setString(ordinal, columnType.extract(buffer)) + } +} + +class BinaryColumnAccessor(buffer: ByteBuffer) + extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer) + with NullableColumnAccessor { + + def columnType = BINARY + + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row(ordinal) = columnType.extract(buffer) + } +} + +class GenericColumnAccessor(buffer: ByteBuffer) + extends BasicColumnAccessor[DataType, Array[Byte]](buffer) + with NullableColumnAccessor { + + def columnType = GENERIC + + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + val serialized = columnType.extract(buffer) + row(ordinal) = KryoSerializer.deserialize[Any](serialized) + } +} + +object ColumnAccessor { + def apply(b: ByteBuffer): ColumnAccessor = { + // The first 4 bytes in the buffer indicates the column type. + val buffer = b.duplicate().order(ByteOrder.nativeOrder()) + val columnTypeId = buffer.getInt() + + columnTypeId match { + case INT.typeId => new IntColumnAccessor(buffer) + case LONG.typeId => new LongColumnAccessor(buffer) + case FLOAT.typeId => new FloatColumnAccessor(buffer) + case DOUBLE.typeId => new DoubleColumnAccessor(buffer) + case BOOLEAN.typeId => new BooleanColumnAccessor(buffer) + case BYTE.typeId => new ByteColumnAccessor(buffer) + case SHORT.typeId => new ShortColumnAccessor(buffer) + case STRING.typeId => new StringColumnAccessor(buffer) + case BINARY.typeId => new BinaryColumnAccessor(buffer) + case GENERIC.typeId => new GenericColumnAccessor(buffer) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index 505a7a3c20b2f..21b2dcd3a55cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -4,6 +4,7 @@ package columnar import java.nio.{ByteOrder, ByteBuffer} import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.execution.KryoSerializer trait ColumnBuilder { /** @@ -11,138 +12,176 @@ trait ColumnBuilder { */ def initialize(initialSize: Int, columnName: String = "") - def append(row: Row, ordinal: Int) + def appendFrom(row: Row, ordinal: Int) def build(): ByteBuffer } -object ColumnBuilder { - val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104 - - def ensureFreeSpace(orig: ByteBuffer, size: Int) = { - if (orig.remaining >= size) { - orig - } else { - // grow in steps of initial size - val capacity = orig.capacity() - val newSize = capacity + size.max(capacity / 8 + 1) - val pos = orig.position() - - orig.clear() - ByteBuffer - .allocate(newSize) - .order(ByteOrder.nativeOrder()) - .put(orig.array(), 0, pos) - } - } -} - abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder { import ColumnBuilder._ - private var _columnName: String = _ - private var _buffer: ByteBuffer = _ + private var columnName: String = _ + protected var buffer: ByteBuffer = _ def columnType: ColumnType[T, JvmType] override def initialize(initialSize: Int, columnName: String = "") = { val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize - _columnName = columnName - _buffer = ByteBuffer.allocate(4 + 4 + size * columnType.defaultSize) - _buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId) + this.columnName = columnName + buffer = ByteBuffer.allocate(4 + 4 + size * columnType.defaultSize) + buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId) } // Have to give a concrete implementation to make mixin possible - override def append(row: Row, ordinal: Int) { - doAppend(row, ordinal) + override def appendFrom(row: Row, ordinal: Int) { + doAppendFrom(row, ordinal) } // Concrete `ColumnBuilder`s can override this method to append values - protected def doAppend(row: Row, ordinal: Int) + protected def doAppendFrom(row: Row, ordinal: Int) // Helper method to append primitive values (to avoid boxing cost) protected def appendValue(v: JvmType) { - _buffer = ensureFreeSpace(_buffer, columnType.actualSize(v)) - columnType.append(v, _buffer) + buffer = ensureFreeSpace(buffer, columnType.actualSize(v)) + columnType.append(v, buffer) } override def build() = { - _buffer.limit(_buffer.position()).rewind() - _buffer + buffer.limit(buffer.position()).rewind() + buffer } } abstract class NativeColumnBuilder[T <: NativeType](val columnType: NativeColumnType[T]) extends BasicColumnBuilder[T, T#JvmType] - with NullableColumnBuilder[T, T#JvmType] + with NullableColumnBuilder class BooleanColumnBuilder extends NativeColumnBuilder(BOOLEAN) { - override def doAppend(row: Row, ordinal: Int) { + override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getBoolean(ordinal)) } } class IntColumnBuilder extends NativeColumnBuilder(INT) { - override def doAppend(row: Row, ordinal: Int) { + override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getInt(ordinal)) } } class ShortColumnBuilder extends NativeColumnBuilder(SHORT) { - override def doAppend(row: Row, ordinal: Int) { + override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getShort(ordinal)) } } class LongColumnBuilder extends NativeColumnBuilder(LONG) { - override def doAppend(row: Row, ordinal: Int) { + override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getLong(ordinal)) } } class ByteColumnBuilder extends NativeColumnBuilder(BYTE) { - override def doAppend(row: Row, ordinal: Int) { + override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getByte(ordinal)) } } class DoubleColumnBuilder extends NativeColumnBuilder(DOUBLE) { - override def doAppend(row: Row, ordinal: Int) { + override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getDouble(ordinal)) } } class FloatColumnBuilder extends NativeColumnBuilder(FLOAT) { - override def doAppend(row: Row, ordinal: Int) { + override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getFloat(ordinal)) } } class StringColumnBuilder extends NativeColumnBuilder(STRING) { - override def doAppend(row: Row, ordinal: Int) { + override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getString(ordinal)) } } class BinaryColumnBuilder extends BasicColumnBuilder[BinaryType.type, Array[Byte]] - with NullableColumnBuilder[BinaryType.type, Array[Byte]] { + with NullableColumnBuilder { - val columnType = BINARY + def columnType = BINARY - override def doAppend(row: Row, ordinal: Int) { + override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row(ordinal).asInstanceOf[Array[Byte]]) } } // TODO (lian) Add support for array, struct and map class GenericColumnBuilder - extends BasicColumnBuilder[DataType, Any] - with NullableColumnBuilder[DataType, Any] { + extends BasicColumnBuilder[DataType, Array[Byte]] + with NullableColumnBuilder { + + def columnType = GENERIC + + override def doAppendFrom(row: Row, ordinal: Int) { + val serialized = KryoSerializer.serialize(row(ordinal)) + buffer = ColumnBuilder.ensureFreeSpace(buffer, columnType.actualSize(serialized)) + columnType.append(serialized, buffer) + } +} + +object ColumnBuilder { + val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104 + + private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = { + if (orig.remaining >= size) { + orig + } else { + // grow in steps of initial size + val capacity = orig.capacity() + val newSize = capacity + size.max(capacity / 8 + 1) + val pos = orig.position() - val columnType = GENERIC + orig.clear() + ByteBuffer + .allocate(newSize) + .order(ByteOrder.nativeOrder()) + .put(orig.array(), 0, pos) + } + } + + def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = { + val builder = (typeId match { + case INT.typeId => new IntColumnBuilder + case LONG.typeId => new LongColumnBuilder + case FLOAT.typeId => new FloatColumnBuilder + case DOUBLE.typeId => new DoubleColumnBuilder + case BOOLEAN.typeId => new BooleanColumnBuilder + case BYTE.typeId => new ByteColumnBuilder + case SHORT.typeId => new ShortColumnBuilder + case STRING.typeId => new StringColumnBuilder + case BINARY.typeId => new BinaryColumnBuilder + case GENERIC.typeId => new GenericColumnBuilder + }).asInstanceOf[ColumnBuilder] + + builder.initialize(initialSize, columnName) + builder + } - override def doAppend(row: Row, ordinal: Int) { - appendValue(row(ordinal)) + def apply(dataType: DataType, initialSize: Int, columnName: String): ColumnBuilder = { + val builder = (dataType match { + case IntegerType => new IntColumnBuilder + case LongType => new LongColumnBuilder + case FloatType => new FloatColumnBuilder + case DoubleType => new DoubleColumnBuilder + case BooleanType => new BooleanColumnBuilder + case ByteType => new ByteColumnBuilder + case ShortType => new ShortColumnBuilder + case StringType => new StringColumnBuilder + case BinaryType => new BinaryColumnBuilder + case _ => new GenericColumnBuilder + }).asInstanceOf[ColumnBuilder] + + builder.initialize(initialSize, columnName) + builder } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index 4c85f1c261dd9..7bbf7e2cf4a11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -4,7 +4,6 @@ package columnar import java.nio.ByteBuffer import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.execution.KryoSerializer /** * An abstract class that represents type of a column. Used to append/extract Java objects into/from @@ -154,19 +153,20 @@ object BINARY extends ColumnType[BinaryType.type, Array[Byte]](8, 16) { } } -object GENERIC extends ColumnType[DataType, Any](9, 16) { - // TODO (lian) Must avoid duplicated serialization here. - override def actualSize(v: Any) = KryoSerializer.serialize(v).size +// Used process generic objects (all types other than those listed above). Objects should be +// serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized +// byte array. +object GENERIC extends ColumnType[DataType, Array[Byte]](9, 16) { + override def actualSize(v: Array[Byte]) = v.length + 4 - override def append(v: Any, buffer: ByteBuffer) { - val serialized = KryoSerializer.serialize(v) - buffer.putInt(serialized.length).put(serialized) + override def append(v: Array[Byte], buffer: ByteBuffer) { + buffer.putInt(v.length).put(v) } override def extract(buffer: ByteBuffer) = { val length = buffer.getInt() - val serialized = new Array[Byte](length) - buffer.get(serialized, 0, length) - KryoSerializer.deserialize[Any](serialized) + val bytes = new Array[Byte](length) + buffer.get(bytes, 0, length) + bytes } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala new file mode 100644 index 0000000000000..e57b23fbfcae4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala @@ -0,0 +1,40 @@ +package org.apache.spark.sql.columnar + +import java.nio.{ByteOrder, ByteBuffer} + +import org.apache.spark.sql.catalyst.expressions.MutableRow + +trait NullableColumnAccessor extends ColumnAccessor { + private var nullsBuffer: ByteBuffer = _ + private var nullCount: Int = _ + private var seenNulls: Int = 0 + + private var nextNullIndex: Int = _ + private var pos: Int = 0 + + abstract override def initialize() { + nullsBuffer = underlyingBuffer.duplicate().order(ByteOrder.nativeOrder()) + nullCount = nullsBuffer.getInt() + nextNullIndex = if (nullCount > 0) nullsBuffer.getInt() else -1 + pos = 0 + + underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4) + super.initialize() + } + + abstract override def extractTo(row: MutableRow, ordinal: Int) { + if (pos == nextNullIndex) { + seenNulls += 1 + + if (seenNulls < nullCount) { + nextNullIndex = nullsBuffer.getInt() + } + + row.setNullAt(ordinal) + } else { + super.extractTo(row, ordinal) + } + + pos += 1 + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala index 1a9a921222e03..d464e3711411b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala @@ -3,55 +3,64 @@ package columnar import java.nio.{ByteOrder, ByteBuffer} -import org.apache.spark.sql.catalyst.types.DataType - /** * Builds a nullable column. The byte buffer of a nullable column contains: * - 4 bytes for the null count (number of nulls) * - positions for each null, in ascending order * - the non-null data (column data type, compression type, data...) */ -trait NullableColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder { - import ColumnBuilder._ - - private var _nulls: ByteBuffer = _ - private var _pos: Int = _ - private var _nullCount: Int = _ +trait NullableColumnBuilder extends ColumnBuilder { + private var nulls: ByteBuffer = _ + private var pos: Int = _ + private var nullCount: Int = _ abstract override def initialize(initialSize: Int, columnName: String) { - _nulls = ByteBuffer.allocate(1024) - _nulls.order(ByteOrder.nativeOrder()) - _pos = 0 - _nullCount = 0 + nulls = ByteBuffer.allocate(1024) + nulls.order(ByteOrder.nativeOrder()) + pos = 0 + nullCount = 0 super.initialize(initialSize, columnName) } - abstract override def append(row: Row, ordinal: Int) { + abstract override def appendFrom(row: Row, ordinal: Int) { if (row.isNullAt(ordinal)) { - _nulls = ensureFreeSpace(_nulls, 4) - _nulls.putInt(_pos) - _nullCount += 1 + nulls = ColumnBuilder.ensureFreeSpace(nulls, 4) + nulls.putInt(pos) + nullCount += 1 } else { - super.append(row, ordinal) + super.appendFrom(row, ordinal) } - _pos += 1 + pos += 1 } abstract override def build(): ByteBuffer = { val nonNulls = super.build() - val nullDataLen = _nulls.position() + val typeId = nonNulls.getInt() + val nullDataLen = nulls.position() - _nulls.limit(nullDataLen) - _nulls.rewind() + nulls.limit(nullDataLen) + nulls.rewind() - // 4 bytes for null count + null positions + non nulls - ByteBuffer + // Column type ID is moved to the front, follows the null count, then non-null data + // + // +---------+ + // | 4 bytes | Column type ID + // +---------+ + // | 4 bytes | Null count + // +---------+ + // | ... | Null positions (if null count is not zero) + // +---------+ + // | ... | Non-null part (without column type ID) + // +---------+ + val buffer = ByteBuffer .allocate(4 + nullDataLen + nonNulls.limit) .order(ByteOrder.nativeOrder()) - .putInt(_nullCount) - .put(_nulls) + .putInt(typeId) + .putInt(nullCount) + .put(nulls) .put(nonNulls) - .rewind() - .asInstanceOf[ByteBuffer] + + buffer.rewind() + buffer } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala index 22c6fc5fb30fb..388c5298f1a82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala @@ -11,10 +11,10 @@ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution.KryoSerializer class ColumnTypeSuite extends FunSuite { - val columnTypes = Seq(INT, SHORT, LONG, BYTE, DOUBLE, FLOAT, STRING) + val columnTypes = Seq(INT, SHORT, LONG, BYTE, DOUBLE, FLOAT, STRING, BINARY, GENERIC) test("defaultSize") { - val defaultSize = Seq(4, 2, 8, 1, 8, 4, 8) + val defaultSize = Seq(4, 2, 8, 1, 8, 4, 8, 16, 16) columnTypes.zip(defaultSize).foreach { case (columnType, size) => assert(columnType.defaultSize === size) @@ -22,7 +22,7 @@ class ColumnTypeSuite extends FunSuite { } test("actualSize") { - val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5) + val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5, 4 + 4, 4 + 11) val actualSizes = Seq( INT.actualSize(Int.MaxValue), SHORT.actualSize(Short.MaxValue), @@ -30,7 +30,9 @@ class ColumnTypeSuite extends FunSuite { BYTE.actualSize(Byte.MaxValue), DOUBLE.actualSize(Double.MaxValue), FLOAT.actualSize(Float.MaxValue), - STRING.actualSize("hello")) + STRING.actualSize("hello"), + BINARY.actualSize(new Array[Byte](4)), + GENERIC.actualSize(KryoSerializer.serialize(Map(1 -> "a")))) expectedSizes.zip(actualSizes).foreach { case (expected, actual) => assert(expected === actual) @@ -153,7 +155,7 @@ class ColumnTypeSuite extends FunSuite { val obj = Map(1 -> "spark", 2 -> "sql") val serializedObj = KryoSerializer.serialize(obj) - GENERIC.append(obj, buffer) + GENERIC.append(KryoSerializer.serialize(obj), buffer) buffer.rewind() val length = buffer.getInt() @@ -167,7 +169,7 @@ class ColumnTypeSuite extends FunSuite { buffer.putInt(serializedObj.length).put(serializedObj) buffer.rewind() - assert(obj === GENERIC.extract(buffer)) + assert(obj === KryoSerializer.deserialize(GENERIC.extract(buffer))) } def testNumericColumnType[T <: DataType, JvmType]( @@ -177,7 +179,9 @@ class ColumnTypeSuite extends FunSuite { putter: (ByteBuffer, JvmType) => Unit, getter: (ByteBuffer) => JvmType) { - test(columnType.getClass.getSimpleName.stripSuffix("$")) { + val columnTypeName = columnType.getClass.getSimpleName.stripSuffix("$") + + test(s"$columnTypeName.extract") { buffer.rewind() seq.foreach(putter(buffer, _)) @@ -185,7 +189,9 @@ class ColumnTypeSuite extends FunSuite { seq.foreach { i => assert(i === columnType.extract(buffer)) } + } + test(s"$columnTypeName.append") { buffer.rewind() seq.foreach(columnType.append(_, buffer)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala new file mode 100644 index 0000000000000..b73dc4bd5e3cf --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala @@ -0,0 +1,38 @@ +package org.apache.spark.sql.columnar + +import scala.util.Random + +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow + +// TODO Enrich test data +object ColumnarTestData { + object GenericMutableRow { + def apply(values: Any*) = { + val row = new GenericMutableRow(values.length) + row.indices.foreach { i => + row(i) = values(i) + } + row + } + } + + def randomBytes(length: Int) = { + val bytes = new Array[Byte](length) + Random.nextBytes(bytes) + bytes + } + + val nonNullRandomRow = GenericMutableRow( + Random.nextInt(), + Random.nextLong(), + Random.nextFloat(), + Random.nextDouble(), + Random.nextBoolean(), + Random.nextInt(Byte.MaxValue).asInstanceOf[Byte], + Random.nextInt(Short.MaxValue).asInstanceOf[Short], + Random.nextString(Random.nextInt(64)), + randomBytes(Random.nextInt(64)), + Map(Random.nextInt() -> Random.nextString(4))) + + val nullRow = GenericMutableRow(Seq.fill(10)(null): _*) +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala new file mode 100644 index 0000000000000..12b116177498e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala @@ -0,0 +1,51 @@ +package org.apache.spark.sql +package columnar + +import org.scalatest.FunSuite +import org.apache.spark.sql.catalyst.types.DataType +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow + +class NullableColumnAccessorSuite extends FunSuite { + import ColumnarTestData._ + + testNullableColumnAccessor(BOOLEAN) + testNullableColumnAccessor(INT) + testNullableColumnAccessor(SHORT) + testNullableColumnAccessor(LONG) + testNullableColumnAccessor(BYTE) + testNullableColumnAccessor(DOUBLE) + testNullableColumnAccessor(FLOAT) + testNullableColumnAccessor(STRING) + testNullableColumnAccessor(BINARY) + testNullableColumnAccessor(GENERIC) + + def testNullableColumnAccessor[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) { + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + + test(s"$typeName accessor: empty column") { + val builder = ColumnBuilder(columnType.typeId, 4) + val accessor = ColumnAccessor(builder.build()) + assert(!accessor.hasNext) + } + + test(s"$typeName accessor: access null values") { + val builder = ColumnBuilder(columnType.typeId, 4) + + (0 until 4).foreach { _ => + builder.appendFrom(nonNullRandomRow, columnType.typeId) + builder.appendFrom(nullRow, columnType.typeId) + } + + val accessor = ColumnAccessor(builder.build()) + val row = new GenericMutableRow(1) + + (0 until 4).foreach { _ => + accessor.extractTo(row, 0) + assert(row(0) === nonNullRandomRow(columnType.typeId)) + + accessor.extractTo(row, 0) + assert(row(0) === null) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala index ff28fc3d62915..26e0d4dfb8c66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala @@ -1,106 +1,81 @@ package org.apache.spark.sql package columnar -import scala.util.Random - import org.scalatest.FunSuite -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.catalyst.types.DataType +import org.apache.spark.sql.execution.KryoSerializer class NullableColumnBuilderSuite extends FunSuite { - testNullableColumnBuilder(new IntColumnBuilder, INT) - testNullableColumnBuilder(new LongColumnBuilder, LONG) - testNullableColumnBuilder(new ShortColumnBuilder, SHORT) - testNullableColumnBuilder(new BooleanColumnBuilder, BOOLEAN) - testNullableColumnBuilder(new ByteColumnBuilder, BYTE) - testNullableColumnBuilder(new StringColumnBuilder, STRING) - testNullableColumnBuilder(new DoubleColumnBuilder, DOUBLE) - testNullableColumnBuilder(new FloatColumnBuilder, FLOAT) - testNullableColumnBuilder(new BinaryColumnBuilder, BINARY) - testNullableColumnBuilder(new GenericColumnBuilder, GENERIC) - - val nonNullTestRow = { - val row = new GenericMutableRow(10) - - row(INT.typeId) = Random.nextInt() - row(LONG.typeId) = Random.nextLong() - row(FLOAT.typeId) = Random.nextFloat() - row(DOUBLE.typeId) = Random.nextDouble() - row(BOOLEAN.typeId) = Random.nextBoolean() - row(BYTE.typeId) = Random.nextInt(Byte.MaxValue).asInstanceOf[Byte] - row(SHORT.typeId) = Random.nextInt(Short.MaxValue).asInstanceOf[Short] - row(STRING.typeId) = Random.nextString(4) - row(BINARY.typeId) = { - val bytes = new Array[Byte](4) - Random.nextBytes(bytes) - bytes - } - row(GENERIC.typeId) = Map(Random.nextInt() -> Random.nextString(4)) - - row - } - - val nullTestRow = { - val row = new GenericMutableRow(10) - (0 until 10).foreach(row.setNullAt) - row - } - - def testNullableColumnBuilder[T <: DataType, JvmType]( - columnBuilder: ColumnBuilder, - columnType: ColumnType[T, JvmType]) { - - val columnBuilderName = columnBuilder.getClass.getSimpleName - - test(s"$columnBuilderName: empty column") { + import ColumnarTestData._ + + testNullableColumnBuilder(INT) + testNullableColumnBuilder(LONG) + testNullableColumnBuilder(SHORT) + testNullableColumnBuilder(BOOLEAN) + testNullableColumnBuilder(BYTE) + testNullableColumnBuilder(STRING) + testNullableColumnBuilder(DOUBLE) + testNullableColumnBuilder(FLOAT) + testNullableColumnBuilder(BINARY) + testNullableColumnBuilder(GENERIC) + + def testNullableColumnBuilder[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) { + val columnBuilder = ColumnBuilder(columnType.typeId) + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + + test(s"$typeName column builder: empty column") { columnBuilder.initialize(4) val buffer = columnBuilder.build() - // For null count - assert(buffer.getInt === 0) // For column type ID assert(buffer.getInt() === columnType.typeId) + // For null count + assert(buffer.getInt === 0) assert(!buffer.hasRemaining) } - test(s"$columnBuilderName: buffer size auto growth") { + test(s"$typeName column builder: buffer size auto growth") { columnBuilder.initialize(4) (0 until 4) foreach { _ => - columnBuilder.append(nonNullTestRow, columnType.typeId) + columnBuilder.appendFrom(nonNullRandomRow, columnType.typeId) } val buffer = columnBuilder.build() - // For null count - assert(buffer.getInt() === 0) // For column type ID assert(buffer.getInt() === columnType.typeId) + // For null count + assert(buffer.getInt() === 0) } - test(s"$columnBuilderName: null values") { + test(s"$typeName column builder: null values") { columnBuilder.initialize(4) (0 until 4) foreach { _ => - columnBuilder.append(nonNullTestRow, columnType.typeId) - columnBuilder.append(nullTestRow, columnType.typeId) + columnBuilder.appendFrom(nonNullRandomRow, columnType.typeId) + columnBuilder.appendFrom(nullRow, columnType.typeId) } val buffer = columnBuilder.build() + // For column type ID + assert(buffer.getInt() === columnType.typeId) // For null count assert(buffer.getInt() === 4) // For null positions (1 to 7 by 2).foreach(i => assert(buffer.getInt() === i)) - // For column type ID - assert(buffer.getInt() === columnType.typeId) - // For non-null values (0 until 4).foreach { _ => - assert(columnType.extract(buffer) === nonNullTestRow(columnType.typeId)) + val actual = if (columnType == GENERIC) { + KryoSerializer.deserialize[Any](GENERIC.extract(buffer)) + } else { + columnType.extract(buffer) + } + assert(actual === nonNullRandomRow(columnType.typeId)) } assert(!buffer.hasRemaining) From da2f4d5d24ac17de85be87225790b7df5aa7e7b4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 18 Mar 2014 11:40:27 +0800 Subject: [PATCH 07/19] Added Apache license Signed-off-by: Cheng Lian --- .../spark/sql/columnar/ColumnAccessor.scala | 17 +++++++++++++++++ .../spark/sql/columnar/ColumnBuilder.scala | 17 +++++++++++++++++ .../apache/spark/sql/columnar/ColumnType.scala | 17 +++++++++++++++++ .../sql/columnar/NullableColumnAccessor.scala | 17 +++++++++++++++++ .../sql/columnar/NullableColumnBuilder.scala | 17 +++++++++++++++++ .../spark/sql/columnar/ColumnarTestData.scala | 17 +++++++++++++++++ .../columnar/NullableColumnAccessorSuite.scala | 17 +++++++++++++++++ .../columnar/NullableColumnBuilderSuite.scala | 17 +++++++++++++++++ 8 files changed, 136 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala index 528a15ec38213..6f31864673aeb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql package columnar diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index 21b2dcd3a55cf..b6902c9450152 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql package columnar diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index 7bbf7e2cf4a11..ad5198111ab9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql package columnar diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala index e57b23fbfcae4..eb02022f1408d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.columnar import java.nio.{ByteOrder, ByteBuffer} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala index d464e3711411b..b0ae3bf8db39d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql package columnar diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala index b73dc4bd5e3cf..ddcdede8d1a4a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.columnar import scala.util.Random diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala index 12b116177498e..ccd0446e6ec90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql package columnar diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala index 26e0d4dfb8c66..7e0946d9dd374 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql package columnar From 214be73c872c573dea1468abb71efa6a8faad0f6 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 18 Mar 2014 14:56:41 +0800 Subject: [PATCH 08/19] Refactored BINARY and GENERIC to reduce duplicate code Signed-off-by: Cheng Lian --- .../spark/sql/columnar/ColumnType.scala | 22 +++++-------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index ad5198111ab9d..38245bee29000 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.sql package columnar @@ -155,7 +154,9 @@ object STRING extends NativeColumnType(StringType, 7, 8) { } } -object BINARY extends ColumnType[BinaryType.type, Array[Byte]](8, 16) { +sealed abstract class ByteArrayColumnType[T <: DataType](typeId: Int, defaultSize: Int) + extends ColumnType[T, Array[Byte]](typeId, defaultSize) { + override def actualSize(v: Array[Byte]) = v.length + 4 override def append(v: Array[Byte], buffer: ByteBuffer) { @@ -170,20 +171,9 @@ object BINARY extends ColumnType[BinaryType.type, Array[Byte]](8, 16) { } } +object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) + // Used process generic objects (all types other than those listed above). Objects should be // serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized // byte array. -object GENERIC extends ColumnType[DataType, Array[Byte]](9, 16) { - override def actualSize(v: Array[Byte]) = v.length + 4 - - override def append(v: Array[Byte], buffer: ByteBuffer) { - buffer.putInt(v.length).put(v) - } - - override def extract(buffer: ByteBuffer) = { - val length = buffer.getInt() - val bytes = new Array[Byte](length) - buffer.get(bytes, 0, length) - bytes - } -} +object GENERIC extends ByteArrayColumnType[DataType](9, 16) From b6c0a49cb5d2bbe798d6d4ffc98ab16252b2fd27 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 19 Mar 2014 16:33:38 +0800 Subject: [PATCH 09/19] Minor test suite refactoring Signed-off-by: Cheng Lian --- .../sql/columnar/NullableColumnAccessorSuite.scala | 13 +++---------- .../sql/columnar/NullableColumnBuilderSuite.scala | 13 +++---------- 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala index ccd0446e6ec90..279607ccfaa5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala @@ -25,16 +25,9 @@ import org.apache.spark.sql.catalyst.expressions.GenericMutableRow class NullableColumnAccessorSuite extends FunSuite { import ColumnarTestData._ - testNullableColumnAccessor(BOOLEAN) - testNullableColumnAccessor(INT) - testNullableColumnAccessor(SHORT) - testNullableColumnAccessor(LONG) - testNullableColumnAccessor(BYTE) - testNullableColumnAccessor(DOUBLE) - testNullableColumnAccessor(FLOAT) - testNullableColumnAccessor(STRING) - testNullableColumnAccessor(BINARY) - testNullableColumnAccessor(GENERIC) + Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC).foreach { + testNullableColumnAccessor(_) + } def testNullableColumnAccessor[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) { val typeName = columnType.getClass.getSimpleName.stripSuffix("$") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala index 7e0946d9dd374..7d39af8f4b6f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala @@ -26,16 +26,9 @@ import org.apache.spark.sql.execution.KryoSerializer class NullableColumnBuilderSuite extends FunSuite { import ColumnarTestData._ - testNullableColumnBuilder(INT) - testNullableColumnBuilder(LONG) - testNullableColumnBuilder(SHORT) - testNullableColumnBuilder(BOOLEAN) - testNullableColumnBuilder(BYTE) - testNullableColumnBuilder(STRING) - testNullableColumnBuilder(DOUBLE) - testNullableColumnBuilder(FLOAT) - testNullableColumnBuilder(BINARY) - testNullableColumnBuilder(GENERIC) + Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC).foreach { + testNullableColumnBuilder(_) + } def testNullableColumnBuilder[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) { val columnBuilder = ColumnBuilder(columnType.typeId) From b8a645a35bd61a619ed05b97d4b8cab3f422aa0f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 19 Mar 2014 17:41:21 +0800 Subject: [PATCH 10/19] Replaced KryoSerializer with an updated SparkSqlSerializer * SparkSqlSerializer is moved to a separate source file * SparkSqlSerializer.newKryo calls super.newKryo * Class registration is no longer required, since we may de/serialise objects of any class with generic column accessor/builder. Signed-off-by: Cheng Lian --- .../spark/sql/columnar/ColumnAccessor.scala | 4 +- .../spark/sql/columnar/ColumnBuilder.scala | 4 +- .../apache/spark/sql/execution/Exchange.scala | 34 +----------- .../spark/sql/execution/KryoSerializer.scala | 22 -------- .../sql/execution/SparkSqlSerializer.scala | 53 +++++++++++++++++++ .../spark/sql/columnar/ColumnTypeSuite.scala | 14 ++--- .../columnar/NullableColumnBuilderSuite.scala | 4 +- 7 files changed, 67 insertions(+), 68 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/KryoSerializer.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala index 6f31864673aeb..b768fb7974634 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala @@ -22,7 +22,7 @@ import java.nio.{ByteOrder, ByteBuffer} import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType} import org.apache.spark.sql.catalyst.expressions.MutableRow -import org.apache.spark.sql.execution.KryoSerializer +import org.apache.spark.sql.execution.SparkSqlSerializer /** * An `Iterator` like trait used to extract values from columnar byte buffer. When a value is @@ -133,7 +133,7 @@ class GenericColumnAccessor(buffer: ByteBuffer) override protected def doExtractTo(row: MutableRow, ordinal: Int) { val serialized = columnType.extract(buffer) - row(ordinal) = KryoSerializer.deserialize[Any](serialized) + row(ordinal) = SparkSqlSerializer.deserialize[Any](serialized) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index b6902c9450152..89f412a43a706 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -21,7 +21,7 @@ package columnar import java.nio.{ByteOrder, ByteBuffer} import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.execution.KryoSerializer +import org.apache.spark.sql.execution.SparkSqlSerializer trait ColumnBuilder { /** @@ -140,7 +140,7 @@ class GenericColumnBuilder def columnType = GENERIC override def doAppendFrom(row: Row, ordinal: Int) { - val serialized = KryoSerializer.serialize(row(ordinal)) + val serialized = SparkSqlSerializer.serialize(row(ordinal)) buffer = ColumnBuilder.ensureFreeSpace(buffer, columnType.actualSize(serialized)) columnType.append(serialized, buffer) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 6c13c537cd14e..f0926c5f529f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -18,47 +18,15 @@ package org.apache.spark.sql package execution -import java.nio.ByteBuffer - -import com.esotericsoftware.kryo.{Kryo, Serializer} -import com.esotericsoftware.kryo.io.{Output, Input} - import org.apache.spark.{SparkConf, RangePartitioner, HashPartitioner} import org.apache.spark.rdd.ShuffledRDD -import org.apache.spark.serializer.{KryoSerializer => SparkKryoSerializer} import org.apache.spark.util.MutablePair import catalyst.rules.Rule import catalyst.errors._ import catalyst.expressions._ import catalyst.plans.physical._ - -private class SparkSqlSerializer(conf: SparkConf) extends SparkKryoSerializer(conf) { - override def newKryo(): Kryo = { - val kryo = new Kryo - kryo.setRegistrationRequired(true) - kryo.register(classOf[MutablePair[_,_]]) - kryo.register(classOf[Array[Any]]) - kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) - kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow]) - kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]]) - kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer) - kryo.setReferences(false) - kryo.setClassLoader(this.getClass.getClassLoader) - kryo - } -} - -private class BigDecimalSerializer extends Serializer[BigDecimal] { - def write(kryo: Kryo, output: Output, bd: math.BigDecimal) { - // TODO: There are probably more efficient representations than strings... - output.writeString(bd.toString) - } - - def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = { - BigDecimal(input.readString()) - } -} +import execution.SparkSqlSerializer case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/KryoSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/KryoSerializer.scala deleted file mode 100644 index fdc9c3872798f..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/KryoSerializer.scala +++ /dev/null @@ -1,22 +0,0 @@ -package org.apache.spark.sql -package execution - -import java.nio.ByteBuffer - -import org.apache.spark.serializer.{KryoSerializer => SparkKryoSerializer} -import org.apache.spark.{SparkConf, SparkEnv} - -object KryoSerializer { - @transient lazy val ser: SparkKryoSerializer = { - val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) - new SparkKryoSerializer(sparkConf) - } - - def serialize[T](o: T): Array[Byte] = { - ser.newInstance().serialize(o).array() - } - - def deserialize[T](bytes: Array[Byte]): T = { - ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes)) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala new file mode 100644 index 0000000000000..33b664c3556f5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -0,0 +1,53 @@ +package org.apache.spark.sql +package execution + +import java.nio.ByteBuffer + +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Serializer, Kryo} + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.util.MutablePair + +class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { + override def newKryo(): Kryo = { + val kryo = super.newKryo() + kryo.setRegistrationRequired(false) + kryo.register(classOf[MutablePair[_,_]]) + kryo.register(classOf[Array[Any]]) + kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) + kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow]) + kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]]) + kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer) + kryo.setReferences(false) + kryo.setClassLoader(this.getClass.getClassLoader) + kryo + } +} + +object SparkSqlSerializer { + @transient lazy val ser: SparkSqlSerializer = { + val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) + new SparkSqlSerializer(sparkConf) + } + + def serialize[T](o: T): Array[Byte] = { + ser.newInstance().serialize(o).array() + } + + def deserialize[T](bytes: Array[Byte]): T = { + ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes)) + } +} + +class BigDecimalSerializer extends Serializer[BigDecimal] { + def write(kryo: Kryo, output: Output, bd: math.BigDecimal) { + // TODO: There are probably more efficient representations than strings... + output.writeString(bd.toString()) + } + + def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = { + BigDecimal(input.readString()) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala index 388c5298f1a82..ba62f2da7123b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala @@ -8,7 +8,7 @@ import scala.util.Random import org.scalatest.FunSuite import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.execution.KryoSerializer +import org.apache.spark.sql.execution.SparkSqlSerializer class ColumnTypeSuite extends FunSuite { val columnTypes = Seq(INT, SHORT, LONG, BYTE, DOUBLE, FLOAT, STRING, BINARY, GENERIC) @@ -22,7 +22,7 @@ class ColumnTypeSuite extends FunSuite { } test("actualSize") { - val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5, 4 + 4, 4 + 11) + val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5, 4 + 4, 4 + 8) val actualSizes = Seq( INT.actualSize(Int.MaxValue), SHORT.actualSize(Short.MaxValue), @@ -32,7 +32,7 @@ class ColumnTypeSuite extends FunSuite { FLOAT.actualSize(Float.MaxValue), STRING.actualSize("hello"), BINARY.actualSize(new Array[Byte](4)), - GENERIC.actualSize(KryoSerializer.serialize(Map(1 -> "a")))) + GENERIC.actualSize(SparkSqlSerializer.serialize(Map(1 -> "a")))) expectedSizes.zip(actualSizes).foreach { case (expected, actual) => assert(expected === actual) @@ -153,9 +153,9 @@ class ColumnTypeSuite extends FunSuite { test("GENERIC") { val buffer = ByteBuffer.allocate(512) val obj = Map(1 -> "spark", 2 -> "sql") - val serializedObj = KryoSerializer.serialize(obj) + val serializedObj = SparkSqlSerializer.serialize(obj) - GENERIC.append(KryoSerializer.serialize(obj), buffer) + GENERIC.append(SparkSqlSerializer.serialize(obj), buffer) buffer.rewind() val length = buffer.getInt() @@ -163,13 +163,13 @@ class ColumnTypeSuite extends FunSuite { val bytes = new Array[Byte](length) buffer.get(bytes, 0, length) - assert(obj === KryoSerializer.deserialize(bytes)) + assert(obj === SparkSqlSerializer.deserialize(bytes)) buffer.rewind() buffer.putInt(serializedObj.length).put(serializedObj) buffer.rewind() - assert(obj === KryoSerializer.deserialize(GENERIC.extract(buffer))) + assert(obj === SparkSqlSerializer.deserialize(GENERIC.extract(buffer))) } def testNumericColumnType[T <: DataType, JvmType]( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala index 7d39af8f4b6f7..3354da3fa3e0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala @@ -21,7 +21,7 @@ package columnar import org.scalatest.FunSuite import org.apache.spark.sql.catalyst.types.DataType -import org.apache.spark.sql.execution.KryoSerializer +import org.apache.spark.sql.execution.SparkSqlSerializer class NullableColumnBuilderSuite extends FunSuite { import ColumnarTestData._ @@ -81,7 +81,7 @@ class NullableColumnBuilderSuite extends FunSuite { // For non-null values (0 until 4).foreach { _ => val actual = if (columnType == GENERIC) { - KryoSerializer.deserialize[Any](GENERIC.extract(buffer)) + SparkSqlSerializer.deserialize[Any](GENERIC.extract(buffer)) } else { columnType.extract(buffer) } From ed8608e5bfe52b1cbd4d6abdb7ce8854831290cb Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 19 Mar 2014 17:41:57 +0800 Subject: [PATCH 11/19] Added implicit conversion from DataType to ColumnType Signed-off-by: Cheng Lian --- .../spark/sql/columnar/ColumnBuilder.scala | 18 ------------------ .../spark/sql/columnar/ColumnType.scala | 19 ++++++++++++++++++- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index 89f412a43a706..72e99207f6bd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -183,22 +183,4 @@ object ColumnBuilder { builder.initialize(initialSize, columnName) builder } - - def apply(dataType: DataType, initialSize: Int, columnName: String): ColumnBuilder = { - val builder = (dataType match { - case IntegerType => new IntColumnBuilder - case LongType => new LongColumnBuilder - case FloatType => new FloatColumnBuilder - case DoubleType => new DoubleColumnBuilder - case BooleanType => new BooleanColumnBuilder - case ByteType => new ByteColumnBuilder - case ShortType => new ShortColumnBuilder - case StringType => new StringColumnBuilder - case BinaryType => new BinaryColumnBuilder - case _ => new GenericColumnBuilder - }).asInstanceOf[ColumnBuilder] - - builder.initialize(initialSize, columnName) - builder - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index 38245bee29000..e68debb4b1e76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -173,7 +173,24 @@ sealed abstract class ByteArrayColumnType[T <: DataType](typeId: Int, defaultSiz object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) -// Used process generic objects (all types other than those listed above). Objects should be +// Used to process generic objects (all types other than those listed above). Objects should be // serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized // byte array. object GENERIC extends ByteArrayColumnType[DataType](9, 16) + +object ColumnType { + implicit def dataTypeToColumnType(dataType: DataType): ColumnType[_, _] = { + dataType match { + case IntegerType => INT + case LongType => LONG + case FloatType => FLOAT + case DoubleType => DOUBLE + case BooleanType => BOOLEAN + case ByteType => BYTE + case ShortType => SHORT + case StringType => STRING + case BinaryType => BINARY + case _ => GENERIC + } + } +} From c701c7ad21d9da4e2479d32cafa5467a3f649cb7 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 20 Mar 2014 03:26:44 +0800 Subject: [PATCH 12/19] Using SparkSqlSerializer for generic object SerDe causes error, made a workaround Signed-off-by: Cheng Lian --- .../spark/sql/execution/SparkSqlSerializer.scala | 10 ++++++---- .../apache/spark/sql/columnar/ColumnTypeSuite.scala | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala index 33b664c3556f5..173b12f7d301a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -12,9 +12,9 @@ import org.apache.spark.util.MutablePair class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { override def newKryo(): Kryo = { - val kryo = super.newKryo() + val kryo = new Kryo() kryo.setRegistrationRequired(false) - kryo.register(classOf[MutablePair[_,_]]) + kryo.register(classOf[MutablePair[_, _]]) kryo.register(classOf[Array[Any]]) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow]) @@ -27,9 +27,11 @@ class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { } object SparkSqlSerializer { - @transient lazy val ser: SparkSqlSerializer = { + // TODO (lian) Using KryoSerializer here is workaround, needs further investigation + // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization related error. + @transient lazy val ser: KryoSerializer = { val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) - new SparkSqlSerializer(sparkConf) + new KryoSerializer(sparkConf) } def serialize[T](o: T): Array[Byte] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala index ba62f2da7123b..c7aaaae94ee5e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala @@ -22,7 +22,7 @@ class ColumnTypeSuite extends FunSuite { } test("actualSize") { - val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5, 4 + 4, 4 + 8) + val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5, 4 + 4, 4 + 11) val actualSizes = Seq( INT.actualSize(Int.MaxValue), SHORT.actualSize(Short.MaxValue), From 220ee1e78f256cc4a04cede091d1a2c04ce3df29 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 20 Mar 2014 03:28:58 +0800 Subject: [PATCH 13/19] Added table scan operator for in-memory columnar support. Signed-off-by: Cheng Lian --- .../columnar/InMemoryColumnarRelation.scala | 32 ++++++++ .../columnar/inMemoryColumnarOperators.scala | 79 +++++++++++++++++++ .../sql/columnar/ColumnarQuerySuite.scala | 17 ++++ 3 files changed, 128 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarRelation.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarRelation.scala new file mode 100644 index 0000000000000..9586b8fa22ecc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarRelation.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package columnar + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.BaseRelation +import org.apache.spark.sql.execution.SparkPlan + +/** + * A simple class that converts any SharkPlan into an in-memory columnar relation. + */ +case class InMemoryColumnarRelation(tableName: String, child: SparkPlan) + extends BaseRelation { + + override def output: Seq[Attribute] = child.output +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala new file mode 100644 index 0000000000000..967cf417f2d40 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package columnar + +import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute} +import org.apache.spark.sql.columnar.ColumnType._ +import org.apache.spark.sql.execution.LeafNode + +case class InMemoryColumnarTableScan( + attributes: Seq[Attribute], + relation: InMemoryColumnarRelation) + extends LeafNode { + + override def output: Seq[Attribute] = attributes + + lazy val cachedColumnBuffers = { + val cached = relation.child.execute().mapPartitions { iterator => + val columnBuilders = relation.child.output.map { a => + ColumnBuilder(a.dataType.typeId, 0, a.name) + }.toArray + + var row: Row = null + while (iterator.hasNext) { + row = iterator.next() + var i = 0 + while (i < row.length) { + columnBuilders(i).appendFrom(row, i) + i += 1 + } + } + + Iterator.single(columnBuilders.map(_.build())) + }.cache() + + cached.setName(relation.child.toString) + // Force the materialization of the cached RDD. + cached.count() + cached + } + + override def execute() = { + cachedColumnBuffers.mapPartitions { iterator => + val columnBuffers = iterator.next() + assert(!iterator.hasNext) + + new Iterator[Row] { + val columnAccessors = columnBuffers.map(buffer => ColumnAccessor(buffer)) + val nextRow = new GenericMutableRow(columnAccessors.length) + + override def next() = { + var i = 0 + while (i < nextRow.length) { + columnAccessors(i).extractTo(nextRow, i) + i += 1 + } + nextRow + } + + override def hasNext = columnAccessors.head.hasNext + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala new file mode 100644 index 0000000000000..ff53f5fb3c668 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala @@ -0,0 +1,17 @@ +package org.apache.spark.sql.columnar + +import org.apache.spark.sql.{TestData, TestSqlContext, DslQueryTest} +import org.apache.spark.sql.execution.SparkLogicalPlan + +class ColumnarQuerySuite extends DslQueryTest { + import TestData._ + + test("simple columnar query") { + val plan = TestSqlContext.executePlan(testData).executedPlan + val attributes = plan.output + val relation = InMemoryColumnarRelation("t1", plan) + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(attributes, relation)) + + checkAnswer(scan, testData.data) + } +} From 9bcae4b8cb5a5c0a750d5d620395742af16376fb Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 20 Mar 2014 17:01:22 +0800 Subject: [PATCH 14/19] Added Apache license Signed-off-by: Cheng Lian --- .../spark/sql/columnar/ColumnarQuerySuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala index ff53f5fb3c668..754f9d22647ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.columnar import org.apache.spark.sql.{TestData, TestSqlContext, DslQueryTest} From a162d4dc63711cbb48b35aeea6d5247ae1769fe4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 21 Mar 2014 16:19:49 +0800 Subject: [PATCH 15/19] Removed the unnecessary InMemoryColumnarRelation class Signed-off-by: Cheng Lian --- .../columnar/InMemoryColumnarRelation.scala | 32 ------------------- .../columnar/inMemoryColumnarOperators.scala | 19 +++++------ .../sql/columnar/ColumnarQuerySuite.scala | 4 +-- 3 files changed, 11 insertions(+), 44 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarRelation.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarRelation.scala deleted file mode 100644 index 9586b8fa22ecc..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarRelation.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql -package columnar - -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.BaseRelation -import org.apache.spark.sql.execution.SparkPlan - -/** - * A simple class that converts any SharkPlan into an in-memory columnar relation. - */ -case class InMemoryColumnarRelation(tableName: String, child: SparkPlan) - extends BaseRelation { - - override def output: Seq[Attribute] = child.output -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala index 967cf417f2d40..ffa7c16492295 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala @@ -19,19 +19,20 @@ package org.apache.spark.sql package columnar import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute} -import org.apache.spark.sql.columnar.ColumnType._ -import org.apache.spark.sql.execution.LeafNode +import org.apache.spark.sql.execution.{SparkPlan, LeafNode} -case class InMemoryColumnarTableScan( - attributes: Seq[Attribute], - relation: InMemoryColumnarRelation) +case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan) extends LeafNode { + // For implicit conversion from `DataType` to `ColumnType` + import ColumnType._ + override def output: Seq[Attribute] = attributes lazy val cachedColumnBuffers = { - val cached = relation.child.execute().mapPartitions { iterator => - val columnBuilders = relation.child.output.map { a => + val output = child.output + val cached = child.execute().mapPartitions { iterator => + val columnBuilders = output.map { a => ColumnBuilder(a.dataType.typeId, 0, a.name) }.toArray @@ -48,7 +49,7 @@ case class InMemoryColumnarTableScan( Iterator.single(columnBuilders.map(_.build())) }.cache() - cached.setName(relation.child.toString) + cached.setName(child.toString) // Force the materialization of the cached RDD. cached.count() cached @@ -60,7 +61,7 @@ case class InMemoryColumnarTableScan( assert(!iterator.hasNext) new Iterator[Row] { - val columnAccessors = columnBuffers.map(buffer => ColumnAccessor(buffer)) + val columnAccessors = columnBuffers.map(ColumnAccessor(_)) val nextRow = new GenericMutableRow(columnAccessors.length) override def next() = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala index 754f9d22647ec..b795c65456494 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala @@ -25,9 +25,7 @@ class ColumnarQuerySuite extends DslQueryTest { test("simple columnar query") { val plan = TestSqlContext.executePlan(testData).executedPlan - val attributes = plan.output - val relation = InMemoryColumnarRelation("t1", plan) - val scan = SparkLogicalPlan(InMemoryColumnarTableScan(attributes, relation)) + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) checkAnswer(scan, testData.data) } From 0dbf2fb46a56d03a458d84794b7335c6a8c5eba4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 22 Mar 2014 10:41:16 +0800 Subject: [PATCH 16/19] Make necessary renaming due to rebase Signed-off-by: Cheng Lian --- .../org/apache/spark/sql/columnar/ColumnarQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala index b795c65456494..a24684f9a950c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.columnar -import org.apache.spark.sql.{TestData, TestSqlContext, DslQueryTest} +import org.apache.spark.sql.{TestData, TestSQLContext, DslQueryTest} import org.apache.spark.sql.execution.SparkLogicalPlan class ColumnarQuerySuite extends DslQueryTest { import TestData._ test("simple columnar query") { - val plan = TestSqlContext.executePlan(testData).executedPlan + val plan = TestSQLContext.executePlan(testData).executedPlan val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) checkAnswer(scan, testData.data) From af1ad5ed46b24996e2dc59f52e55f9fa60bf76eb Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 22 Mar 2014 17:07:50 +0800 Subject: [PATCH 17/19] Fixed some minor issues introduced during rebasing --- .../org/apache/spark/sql/execution/Exchange.scala | 1 - .../test/scala/org/apache/spark/sql/QueryTest.scala | 2 +- .../apache/spark/sql/columnar/ColumnarQuerySuite.scala | 10 ++++++---- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index f0926c5f529f2..e934c4cf69ab5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -26,7 +26,6 @@ import catalyst.rules.Rule import catalyst.errors._ import catalyst.expressions._ import catalyst.plans.physical._ -import execution.SparkSqlSerializer case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 728fecededeb0..aa84211648db0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -33,7 +33,7 @@ import TestSQLContext._ class QueryTest extends FunSuite { /** * Runs the plan and makes sure the answer matches the expected result. - * @param plan the query to be executed + * @param rdd the [[SchemaRDD]] to be executed * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ]. */ protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Any): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala index a24684f9a950c..928851a385d41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala @@ -17,16 +17,18 @@ package org.apache.spark.sql.columnar -import org.apache.spark.sql.{TestData, TestSQLContext, DslQueryTest} import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.{TestData, DslQuerySuite} -class ColumnarQuerySuite extends DslQueryTest { +class ColumnarQuerySuite extends DslQuerySuite { import TestData._ + import TestSQLContext._ test("simple columnar query") { - val plan = TestSQLContext.executePlan(testData).executedPlan + val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) - checkAnswer(scan, testData.data) + checkAnswer(scan, testData.collect().toSeq) } } From 0892ad86bc8a9c99817e5c0a846c241b74df3f16 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 22 Mar 2014 18:12:47 +0800 Subject: [PATCH 18/19] Addressed ScalaStyle issues --- .../sql/execution/SparkSqlSerializer.scala | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala index 173b12f7d301a..ad7cd58b6aaaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql package execution @@ -28,7 +45,8 @@ class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { object SparkSqlSerializer { // TODO (lian) Using KryoSerializer here is workaround, needs further investigation - // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization related error. + // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization + // related error. @transient lazy val ser: KryoSerializer = { val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) new KryoSerializer(sparkConf) From 99dba4180e79329d29789074f7c324cfe46b3bd1 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 23 Mar 2014 13:40:11 +0800 Subject: [PATCH 19/19] Restricted new objects/classes to `private[sql]' --- .gitignore | 1 - .../spark/sql/columnar/ColumnAccessor.scala | 44 +++++++++++++------ .../spark/sql/columnar/ColumnBuilder.scala | 29 ++++++------ .../spark/sql/columnar/ColumnType.scala | 30 +++++++------ .../sql/columnar/NullableColumnAccessor.scala | 2 +- .../sql/columnar/NullableColumnBuilder.scala | 2 +- .../columnar/inMemoryColumnarOperators.scala | 2 +- 7 files changed, 64 insertions(+), 46 deletions(-) diff --git a/.gitignore b/.gitignore index d726fa73e458a..3d178992123da 100644 --- a/.gitignore +++ b/.gitignore @@ -45,4 +45,3 @@ dist/ spark-*-bin.tar.gz unit-tests.log /lib/ -sql/hive/src/test/resources diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala index b768fb7974634..ddbeba6203aa4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.SparkSqlSerializer * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods * for primitive values provided by [[MutableRow]]. */ -trait ColumnAccessor { +private[sql] trait ColumnAccessor { initialize() protected def initialize() @@ -42,7 +42,7 @@ trait ColumnAccessor { protected def underlyingBuffer: ByteBuffer } -abstract class BasicColumnAccessor[T <: DataType, JvmType](buffer: ByteBuffer) +private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](buffer: ByteBuffer) extends ColumnAccessor { protected def initialize() {} @@ -60,61 +60,77 @@ abstract class BasicColumnAccessor[T <: DataType, JvmType](buffer: ByteBuffer) protected def underlyingBuffer = buffer } -abstract class NativeColumnAccessor[T <: NativeType]( +private[sql] abstract class NativeColumnAccessor[T <: NativeType]( buffer: ByteBuffer, val columnType: NativeColumnType[T]) extends BasicColumnAccessor[T, T#JvmType](buffer) with NullableColumnAccessor -class BooleanColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, BOOLEAN) { +private[sql] class BooleanColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, BOOLEAN) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { row.setBoolean(ordinal, columnType.extract(buffer)) } } -class IntColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, INT) { +private[sql] class IntColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, INT) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { row.setInt(ordinal, columnType.extract(buffer)) } } -class ShortColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, SHORT) { +private[sql] class ShortColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, SHORT) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { row.setShort(ordinal, columnType.extract(buffer)) } } -class LongColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, LONG) { +private[sql] class LongColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, LONG) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { row.setLong(ordinal, columnType.extract(buffer)) } } -class ByteColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, BYTE) { +private[sql] class ByteColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, BYTE) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { row.setByte(ordinal, columnType.extract(buffer)) } } -class DoubleColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, DOUBLE) { +private[sql] class DoubleColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, DOUBLE) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { row.setDouble(ordinal, columnType.extract(buffer)) } } -class FloatColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, FLOAT) { +private[sql] class FloatColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, FLOAT) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { row.setFloat(ordinal, columnType.extract(buffer)) } } -class StringColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, STRING) { +private[sql] class StringColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, STRING) { + override protected def doExtractTo(row: MutableRow, ordinal: Int) { row.setString(ordinal, columnType.extract(buffer)) } } -class BinaryColumnAccessor(buffer: ByteBuffer) +private[sql] class BinaryColumnAccessor(buffer: ByteBuffer) extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer) with NullableColumnAccessor { @@ -125,7 +141,7 @@ class BinaryColumnAccessor(buffer: ByteBuffer) } } -class GenericColumnAccessor(buffer: ByteBuffer) +private[sql] class GenericColumnAccessor(buffer: ByteBuffer) extends BasicColumnAccessor[DataType, Array[Byte]](buffer) with NullableColumnAccessor { @@ -137,7 +153,7 @@ class GenericColumnAccessor(buffer: ByteBuffer) } } -object ColumnAccessor { +private[sql] object ColumnAccessor { def apply(b: ByteBuffer): ColumnAccessor = { // The first 4 bytes in the buffer indicates the column type. val buffer = b.duplicate().order(ByteOrder.nativeOrder()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index 72e99207f6bd5..6bd1841821875 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -23,7 +23,7 @@ import java.nio.{ByteOrder, ByteBuffer} import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution.SparkSqlSerializer -trait ColumnBuilder { +private[sql] trait ColumnBuilder { /** * Initializes with an approximate lower bound on the expected number of elements in this column. */ @@ -34,7 +34,7 @@ trait ColumnBuilder { def build(): ByteBuffer } -abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder { +private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder { import ColumnBuilder._ private var columnName: String = _ @@ -69,59 +69,60 @@ abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder } } -abstract class NativeColumnBuilder[T <: NativeType](val columnType: NativeColumnType[T]) +private[sql] abstract class NativeColumnBuilder[T <: NativeType]( + val columnType: NativeColumnType[T]) extends BasicColumnBuilder[T, T#JvmType] with NullableColumnBuilder -class BooleanColumnBuilder extends NativeColumnBuilder(BOOLEAN) { +private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(BOOLEAN) { override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getBoolean(ordinal)) } } -class IntColumnBuilder extends NativeColumnBuilder(INT) { +private[sql] class IntColumnBuilder extends NativeColumnBuilder(INT) { override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getInt(ordinal)) } } -class ShortColumnBuilder extends NativeColumnBuilder(SHORT) { +private[sql] class ShortColumnBuilder extends NativeColumnBuilder(SHORT) { override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getShort(ordinal)) } } -class LongColumnBuilder extends NativeColumnBuilder(LONG) { +private[sql] class LongColumnBuilder extends NativeColumnBuilder(LONG) { override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getLong(ordinal)) } } -class ByteColumnBuilder extends NativeColumnBuilder(BYTE) { +private[sql] class ByteColumnBuilder extends NativeColumnBuilder(BYTE) { override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getByte(ordinal)) } } -class DoubleColumnBuilder extends NativeColumnBuilder(DOUBLE) { +private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(DOUBLE) { override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getDouble(ordinal)) } } -class FloatColumnBuilder extends NativeColumnBuilder(FLOAT) { +private[sql] class FloatColumnBuilder extends NativeColumnBuilder(FLOAT) { override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getFloat(ordinal)) } } -class StringColumnBuilder extends NativeColumnBuilder(STRING) { +private[sql] class StringColumnBuilder extends NativeColumnBuilder(STRING) { override def doAppendFrom(row: Row, ordinal: Int) { appendValue(row.getString(ordinal)) } } -class BinaryColumnBuilder +private[sql] class BinaryColumnBuilder extends BasicColumnBuilder[BinaryType.type, Array[Byte]] with NullableColumnBuilder { @@ -133,7 +134,7 @@ class BinaryColumnBuilder } // TODO (lian) Add support for array, struct and map -class GenericColumnBuilder +private[sql] class GenericColumnBuilder extends BasicColumnBuilder[DataType, Array[Byte]] with NullableColumnBuilder { @@ -146,7 +147,7 @@ class GenericColumnBuilder } } -object ColumnBuilder { +private[sql] object ColumnBuilder { val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104 private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index e68debb4b1e76..3b759a51cc695 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.types._ * @tparam T Scala data type for the column. * @tparam JvmType Underlying Java type to represent the elements. */ -sealed abstract class ColumnType[T <: DataType, JvmType]( +private[sql] sealed abstract class ColumnType[T <: DataType, JvmType]( val typeId: Int, val defaultSize: Int) { @@ -56,7 +56,7 @@ sealed abstract class ColumnType[T <: DataType, JvmType]( def clone(v: JvmType): JvmType = v } -private[columnar] abstract class NativeColumnType[T <: NativeType]( +private[sql] abstract class NativeColumnType[T <: NativeType]( val dataType: T, typeId: Int, defaultSize: Int) @@ -68,7 +68,7 @@ private[columnar] abstract class NativeColumnType[T <: NativeType]( def scalaTag = dataType.tag } -object INT extends NativeColumnType(IntegerType, 0, 4) { +private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) { def append(v: Int, buffer: ByteBuffer) { buffer.putInt(v) } @@ -78,7 +78,7 @@ object INT extends NativeColumnType(IntegerType, 0, 4) { } } -object LONG extends NativeColumnType(LongType, 1, 8) { +private[sql] object LONG extends NativeColumnType(LongType, 1, 8) { override def append(v: Long, buffer: ByteBuffer) { buffer.putLong(v) } @@ -88,7 +88,7 @@ object LONG extends NativeColumnType(LongType, 1, 8) { } } -object FLOAT extends NativeColumnType(FloatType, 2, 4) { +private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) { override def append(v: Float, buffer: ByteBuffer) { buffer.putFloat(v) } @@ -98,7 +98,7 @@ object FLOAT extends NativeColumnType(FloatType, 2, 4) { } } -object DOUBLE extends NativeColumnType(DoubleType, 3, 8) { +private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) { override def append(v: Double, buffer: ByteBuffer) { buffer.putDouble(v) } @@ -108,7 +108,7 @@ object DOUBLE extends NativeColumnType(DoubleType, 3, 8) { } } -object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) { +private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) { override def append(v: Boolean, buffer: ByteBuffer) { buffer.put(if (v) 1.toByte else 0.toByte) } @@ -118,7 +118,7 @@ object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) { } } -object BYTE extends NativeColumnType(ByteType, 5, 1) { +private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) { override def append(v: Byte, buffer: ByteBuffer) { buffer.put(v) } @@ -128,7 +128,7 @@ object BYTE extends NativeColumnType(ByteType, 5, 1) { } } -object SHORT extends NativeColumnType(ShortType, 6, 2) { +private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) { override def append(v: Short, buffer: ByteBuffer) { buffer.putShort(v) } @@ -138,7 +138,7 @@ object SHORT extends NativeColumnType(ShortType, 6, 2) { } } -object STRING extends NativeColumnType(StringType, 7, 8) { +private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { override def actualSize(v: String): Int = v.getBytes.length + 4 override def append(v: String, buffer: ByteBuffer) { @@ -154,7 +154,9 @@ object STRING extends NativeColumnType(StringType, 7, 8) { } } -sealed abstract class ByteArrayColumnType[T <: DataType](typeId: Int, defaultSize: Int) +private[sql] sealed abstract class ByteArrayColumnType[T <: DataType]( + typeId: Int, + defaultSize: Int) extends ColumnType[T, Array[Byte]](typeId, defaultSize) { override def actualSize(v: Array[Byte]) = v.length + 4 @@ -171,14 +173,14 @@ sealed abstract class ByteArrayColumnType[T <: DataType](typeId: Int, defaultSiz } } -object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) +private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) // Used to process generic objects (all types other than those listed above). Objects should be // serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized // byte array. -object GENERIC extends ByteArrayColumnType[DataType](9, 16) +private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16) -object ColumnType { +private[sql] object ColumnType { implicit def dataTypeToColumnType(dataType: DataType): ColumnType[_, _] = { dataType match { case IntegerType => INT diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala index eb02022f1408d..2970c609b928d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala @@ -21,7 +21,7 @@ import java.nio.{ByteOrder, ByteBuffer} import org.apache.spark.sql.catalyst.expressions.MutableRow -trait NullableColumnAccessor extends ColumnAccessor { +private[sql] trait NullableColumnAccessor extends ColumnAccessor { private var nullsBuffer: ByteBuffer = _ private var nullCount: Int = _ private var seenNulls: Int = 0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala index b0ae3bf8db39d..1661c3f3ff4a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala @@ -26,7 +26,7 @@ import java.nio.{ByteOrder, ByteBuffer} * - positions for each null, in ascending order * - the non-null data (column data type, compression type, data...) */ -trait NullableColumnBuilder extends ColumnBuilder { +private[sql] trait NullableColumnBuilder extends ColumnBuilder { private var nulls: ByteBuffer = _ private var pos: Int = _ private var nullCount: Int = _ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala index ffa7c16492295..c7efd30e87da4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala @@ -21,7 +21,7 @@ package columnar import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute} import org.apache.spark.sql.execution.{SparkPlan, LeafNode} -case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan) +private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan) extends LeafNode { // For implicit conversion from `DataType` to `ColumnType`