Skip to content

[SPARK-1371][WIP] Compression support for Spark SQL in-memory columnar storage #285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.{ByteOrder, ByteBuffer}

import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType}
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor

/**
* An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
Expand All @@ -41,121 +41,66 @@ private[sql] trait ColumnAccessor {
protected def underlyingBuffer: ByteBuffer
}

private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](buffer: ByteBuffer)
private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](
protected val buffer: ByteBuffer,
protected val columnType: ColumnType[T, JvmType])
extends ColumnAccessor {

protected def initialize() {}

def columnType: ColumnType[T, JvmType]

def hasNext = buffer.hasRemaining

def extractTo(row: MutableRow, ordinal: Int) {
doExtractTo(row, ordinal)
columnType.setField(row, ordinal, extractSingle(buffer))
}

protected def doExtractTo(row: MutableRow, ordinal: Int)
def extractSingle(buffer: ByteBuffer): JvmType = columnType.extract(buffer)

protected def underlyingBuffer = buffer
}

private[sql] abstract class NativeColumnAccessor[T <: NativeType](
buffer: ByteBuffer,
val columnType: NativeColumnType[T])
extends BasicColumnAccessor[T, T#JvmType](buffer)
override protected val buffer: ByteBuffer,
override protected val columnType: NativeColumnType[T])
extends BasicColumnAccessor(buffer, columnType)
with NullableColumnAccessor
with CompressibleColumnAccessor[T]

private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BOOLEAN) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setBoolean(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, BOOLEAN)

private[sql] class IntColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, INT) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setInt(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, INT)

private[sql] class ShortColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, SHORT) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setShort(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, SHORT)

private[sql] class LongColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, LONG) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setLong(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, LONG)

private[sql] class ByteColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BYTE) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setByte(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, BYTE)

private[sql] class DoubleColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, DOUBLE) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setDouble(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, DOUBLE)

private[sql] class FloatColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, FLOAT) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setFloat(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, FLOAT)

private[sql] class StringColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, STRING) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setString(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, STRING)

private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer)
with NullableColumnAccessor {

def columnType = BINARY

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row(ordinal) = columnType.extract(buffer)
}
}
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer, BINARY)
with NullableColumnAccessor

private[sql] class GenericColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[DataType, Array[Byte]](buffer)
with NullableColumnAccessor {

def columnType = GENERIC

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
val serialized = columnType.extract(buffer)
row(ordinal) = SparkSqlSerializer.deserialize[Any](serialized)
}
}
extends BasicColumnAccessor[DataType, Array[Byte]](buffer, GENERIC)
with NullableColumnAccessor

private[sql] object ColumnAccessor {
def apply(b: ByteBuffer): ColumnAccessor = {
// The first 4 bytes in the buffer indicates the column type.
val buffer = b.duplicate().order(ByteOrder.nativeOrder())
def apply(buffer: ByteBuffer): ColumnAccessor = {
// The first 4 bytes in the buffer indicate the column type.
val columnTypeId = buffer.getInt()

columnTypeId match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,52 @@ import java.nio.{ByteBuffer, ByteOrder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar.ColumnBuilder._
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder}

private[sql] trait ColumnBuilder {
/**
* Initializes with an approximate lower bound on the expected number of elements in this column.
*/
def initialize(initialSize: Int, columnName: String = "")

/**
* Appends `row(ordinal)` to the column builder.
*/
def appendFrom(row: Row, ordinal: Int)

/**
* Column statistics information
*/
def columnStats: ColumnStats[_, _]

/**
* Returns the final columnar byte buffer.
*/
def build(): ByteBuffer
}

private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder {
private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
val columnStats: ColumnStats[T, JvmType],
val columnType: ColumnType[T, JvmType])
extends ColumnBuilder {

private var columnName: String = _
protected var buffer: ByteBuffer = _
protected var columnName: String = _

def columnType: ColumnType[T, JvmType]
protected var buffer: ByteBuffer = _

override def initialize(initialSize: Int, columnName: String = "") = {
val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
this.columnName = columnName
buffer = ByteBuffer.allocate(4 + 4 + size * columnType.defaultSize)

// Reserves 4 bytes for column type ID
buffer = ByteBuffer.allocate(4 + size * columnType.defaultSize)
buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
}

// Have to give a concrete implementation to make mixin possible
override def appendFrom(row: Row, ordinal: Int) {
doAppendFrom(row, ordinal)
}

// Concrete `ColumnBuilder`s can override this method to append values
protected def doAppendFrom(row: Row, ordinal: Int)

// Helper method to append primitive values (to avoid boxing cost)
protected def appendValue(v: JvmType) {
buffer = ensureFreeSpace(buffer, columnType.actualSize(v))
columnType.append(v, buffer)
val field = columnType.getField(row, ordinal)
buffer = ensureFreeSpace(buffer, columnType.actualSize(field))
columnType.append(field, buffer)
}

override def build() = {
Expand All @@ -69,83 +76,39 @@ private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends C
}
}

private[sql] abstract class NativeColumnBuilder[T <: NativeType](
val columnType: NativeColumnType[T])
extends BasicColumnBuilder[T, T#JvmType]
private[sql] abstract class ComplexColumnBuilder[T <: DataType, JvmType](
columnType: ColumnType[T, JvmType])
extends BasicColumnBuilder[T, JvmType](new NoopColumnStats[T, JvmType], columnType)
with NullableColumnBuilder

private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(BOOLEAN) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getBoolean(ordinal))
}
}

private[sql] class IntColumnBuilder extends NativeColumnBuilder(INT) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getInt(ordinal))
}
}
private[sql] abstract class NativeColumnBuilder[T <: NativeType](
override val columnStats: NativeColumnStats[T],
override val columnType: NativeColumnType[T])
extends BasicColumnBuilder[T, T#JvmType](columnStats, columnType)
with NullableColumnBuilder
with AllCompressionSchemes
with CompressibleColumnBuilder[T]

private[sql] class ShortColumnBuilder extends NativeColumnBuilder(SHORT) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getShort(ordinal))
}
}
private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)

private[sql] class LongColumnBuilder extends NativeColumnBuilder(LONG) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getLong(ordinal))
}
}
private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)

private[sql] class ByteColumnBuilder extends NativeColumnBuilder(BYTE) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getByte(ordinal))
}
}
private[sql] class ShortColumnBuilder extends NativeColumnBuilder(new ShortColumnStats, SHORT)

private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(DOUBLE) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getDouble(ordinal))
}
}
private[sql] class LongColumnBuilder extends NativeColumnBuilder(new LongColumnStats, LONG)

private[sql] class FloatColumnBuilder extends NativeColumnBuilder(FLOAT) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getFloat(ordinal))
}
}
private[sql] class ByteColumnBuilder extends NativeColumnBuilder(new ByteColumnStats, BYTE)

private[sql] class StringColumnBuilder extends NativeColumnBuilder(STRING) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getString(ordinal))
}
}
private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleColumnStats, DOUBLE)

private[sql] class BinaryColumnBuilder
extends BasicColumnBuilder[BinaryType.type, Array[Byte]]
with NullableColumnBuilder {
private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT)

def columnType = BINARY
private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING)

override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row(ordinal).asInstanceOf[Array[Byte]])
}
}
private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)

// TODO (lian) Add support for array, struct and map
private[sql] class GenericColumnBuilder
extends BasicColumnBuilder[DataType, Array[Byte]]
with NullableColumnBuilder {

def columnType = GENERIC

override def doAppendFrom(row: Row, ordinal: Int) {
val serialized = SparkSqlSerializer.serialize(row(ordinal))
buffer = ColumnBuilder.ensureFreeSpace(buffer, columnType.actualSize(serialized))
columnType.append(serialized, buffer)
}
}
private[sql] class GenericColumnBuilder extends ComplexColumnBuilder(GENERIC)

private[sql] object ColumnBuilder {
val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
Expand Down
Loading