Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.Properties

private[spark] object SQLConf {
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
Expand Down Expand Up @@ -71,6 +72,9 @@ trait SQLConf {
/** When true tables cached using the in-memory columnar caching will be compressed. */
private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean

/** The number of rows that will be */
private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt

/** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt

Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
currentTable.logicalPlan

case _ =>
InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
}

catalog.registerTable(None, tableName, asInMemoryRelation)
Expand All @@ -284,7 +284,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
table(tableName).queryExecution.analyzed match {
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
// we reregister the RDD as a table.
case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
case inMem @ InMemoryRelation(_, _, _, e: ExistingRdd) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import org.apache.spark.sql.Row
import org.apache.spark.SparkConf

object InMemoryRelation {
def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
new InMemoryRelation(child.output, useCompression, child)()
def apply(useCompression: Boolean, batchSize: Int, child: SparkPlan): InMemoryRelation =
new InMemoryRelation(child.output, useCompression, batchSize, child)()
}

private[sql] case class InMemoryRelation(
output: Seq[Attribute],
useCompression: Boolean,
batchSize: Int,
child: SparkPlan)
(private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
extends LogicalPlan with MultiInstanceRelation {
Expand All @@ -43,22 +44,31 @@ private[sql] case class InMemoryRelation(
// As in Spark, the actual work of caching is lazy.
if (_cachedColumnBuffers == null) {
val output = child.output
val cached = child.execute().mapPartitions { iterator =>
val columnBuilders = output.map { attribute =>
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
}.toArray

var row: Row = null
while (iterator.hasNext) {
row = iterator.next()
var i = 0
while (i < row.length) {
columnBuilders(i).appendFrom(row, i)
i += 1
val cached = child.execute().mapPartitions { baseIterator =>
new Iterator[Array[ByteBuffer]] {
def next() = {
val columnBuilders = output.map { attribute =>
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more precise initial buffer size can be used here:

val columnType = ColumnType(attribute.dataType)
ColumnBuilder(columnType.typeId, columnType.defaultSize * batchSize, attribute.name, useCompression)

}.toArray

var row: Row = null
var rowCount = 0

while (baseIterator.hasNext && rowCount < batchSize) {
row = baseIterator.next()
var i = 0
while (i < row.length) {
columnBuilders(i).appendFrom(row, i)
i += 1
}
rowCount += 1
}

columnBuilders.map(_.build())
}
}

Iterator.single(columnBuilders.map(_.build()))
def hasNext = baseIterator.hasNext
}
}.cache()

cached.setName(child.toString)
Expand All @@ -74,6 +84,7 @@ private[sql] case class InMemoryRelation(
new InMemoryRelation(
output.map(_.newInstance),
useCompression,
batchSize,
child)(
_cachedColumnBuffers).asInstanceOf[this.type]
}
Expand All @@ -90,22 +101,31 @@ private[sql] case class InMemoryColumnarTableScan(

override def execute() = {
relation.cachedColumnBuffers.mapPartitions { iterator =>
val columnBuffers = iterator.next()
assert(!iterator.hasNext)
// Find the ordinals of the requested columns. If none are requested, use the first.
val requestedColumns =
if (attributes.isEmpty) {
Seq(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use the narrowest one instead of the 1st one by checking default sizes of columns:

val narrowest = relation.output.indices.minBy { i =>
  ColumnType(relation.output(i).dataType).defaultSize
}
Seq(narrowest)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be better. Really though I think we should use statistics from #1883 to skip decoding entirely.

} else {
attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
}

new Iterator[Row] {
// Find the ordinals of the requested columns. If none are requested, use the first.
val requestedColumns =
if (attributes.isEmpty) {
Seq(0)
} else {
attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
}
private[this] var columnBuffers: Array[ByteBuffer] = null
private[this] var columnAccessors: Seq[ColumnAccessor] = null
nextBatch()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus @liancheng It's great to see we will support the smaller batch processing in InMemoryRelation, but Ideally, we can rewind the bytebuffers in the beginning of each iteration in InMemoryRelation during the runtime, the whole detail should be transparent to the iterator in InMemoryColumnarTableScan. In this way, I think we can keep most of the code unchanged in InMemoryColumnarTableScan then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I don't get it correctly, but do you mean we should try to reuse batch buffers rather than always allocate new ones for a new batch? I like the idea, and it can surely make the column buffer building process more memory efficient. But currently due to the way ColumnBuilder is implemented, buffer reusing needs more work to be done, probably in another PR :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, @liancheng , you're right. Sorry, I didn't make it clearer, I will create another PR for this. :-)


private[this] val nextRow = new GenericMutableRow(columnAccessors.length)

val columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
val nextRow = new GenericMutableRow(columnAccessors.length)
def nextBatch() = {
columnBuffers = iterator.next()
columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
}

override def next() = {
if (!columnAccessors.head.hasNext) {
nextBatch()
}

var i = 0
while (i < nextRow.length) {
columnAccessors(i).extractTo(nextRow, i)
Expand All @@ -114,7 +134,7 @@ private[sql] case class InMemoryColumnarTableScan(
nextRow
}

override def hasNext = columnAccessors.head.hasNext
override def hasNext = columnAccessors.head.hasNext || iterator.hasNext
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,19 @@ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableSca
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._

case class BigData(s: String)

class CachedTableSuite extends QueryTest {
TestData // Load test tables.

test("too big for memory") {
val data = "*" * 10000
sparkContext.parallelize(1 to 1000000, 1).map(_ => BigData(data)).registerTempTable("bigData")
cacheTable("bigData")
assert(table("bigData").count() === 1000000L)
uncacheTable("bigData")
}

test("SPARK-1669: cacheTable should be idempotent") {
assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])

Expand All @@ -37,7 +47,7 @@ class CachedTableSuite extends QueryTest {

cacheTable("testData")
table("testData").queryExecution.analyzed match {
case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
case InMemoryRelation(_, _, _, _: InMemoryColumnarTableScan) =>
fail("cacheTable is not idempotent")

case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {

test("simple columnar query") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, plan)
val scan = InMemoryRelation(useCompression = true, 5, plan)

checkAnswer(scan, testData.collect().toSeq)
}

test("projection") {
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, plan)
val scan = InMemoryRelation(useCompression = true, 5, plan)

checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key
Expand All @@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {

test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, plan)
val scan = InMemoryRelation(useCompression = true, 5, plan)

checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
castChildOutput(p, table, child)

case p @ logical.InsertIntoTable(
InMemoryRelation(_, _,
InMemoryRelation(_, _, _,
HiveTableScan(_, table, _)), _, child, _) =>
castChildOutput(p, table, child)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[hive] trait HiveStrategies {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.InsertIntoTable(
InMemoryRelation(_, _,
InMemoryRelation(_, _, _,
HiveTableScan(_, table, _)), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case _ => Nil
Expand Down