Skip to content

Commit bad21ed

Browse files
committed
[SPARK-2650][SQL] Build column buffers in smaller batches
Author: Michael Armbrust <[email protected]> Closes #1880 from marmbrus/columnBatches and squashes the following commits: 0649987 [Michael Armbrust] add test 4756fad [Michael Armbrust] fix compilation 2314532 [Michael Armbrust] Build column buffers in smaller batches
1 parent c686b7d commit bad21ed

File tree

7 files changed

+70
-36
lines changed

7 files changed

+70
-36
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.util.Properties
2525

2626
private[spark] object SQLConf {
2727
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
28+
val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
2829
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
2930
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
3031
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
@@ -71,6 +72,9 @@ trait SQLConf {
7172
/** When true tables cached using the in-memory columnar caching will be compressed. */
7273
private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean
7374

75+
/** The number of rows that will be */
76+
private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt
77+
7478
/** Number of partitions to use for shuffle operators. */
7579
private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt
7680

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
273273
currentTable.logicalPlan
274274

275275
case _ =>
276-
InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
276+
InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
277277
}
278278

279279
catalog.registerTable(None, tableName, asInMemoryRelation)
@@ -284,7 +284,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
284284
table(tableName).queryExecution.analyzed match {
285285
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
286286
// we reregister the RDD as a table.
287-
case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
287+
case inMem @ InMemoryRelation(_, _, _, e: ExistingRdd) =>
288288
inMem.cachedColumnBuffers.unpersist()
289289
catalog.unregisterTable(None, tableName)
290290
catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))

sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@ import org.apache.spark.sql.Row
2828
import org.apache.spark.SparkConf
2929

3030
object InMemoryRelation {
31-
def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
32-
new InMemoryRelation(child.output, useCompression, child)()
31+
def apply(useCompression: Boolean, batchSize: Int, child: SparkPlan): InMemoryRelation =
32+
new InMemoryRelation(child.output, useCompression, batchSize, child)()
3333
}
3434

3535
private[sql] case class InMemoryRelation(
3636
output: Seq[Attribute],
3737
useCompression: Boolean,
38+
batchSize: Int,
3839
child: SparkPlan)
3940
(private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
4041
extends LogicalPlan with MultiInstanceRelation {
@@ -43,22 +44,31 @@ private[sql] case class InMemoryRelation(
4344
// As in Spark, the actual work of caching is lazy.
4445
if (_cachedColumnBuffers == null) {
4546
val output = child.output
46-
val cached = child.execute().mapPartitions { iterator =>
47-
val columnBuilders = output.map { attribute =>
48-
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
49-
}.toArray
50-
51-
var row: Row = null
52-
while (iterator.hasNext) {
53-
row = iterator.next()
54-
var i = 0
55-
while (i < row.length) {
56-
columnBuilders(i).appendFrom(row, i)
57-
i += 1
47+
val cached = child.execute().mapPartitions { baseIterator =>
48+
new Iterator[Array[ByteBuffer]] {
49+
def next() = {
50+
val columnBuilders = output.map { attribute =>
51+
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
52+
}.toArray
53+
54+
var row: Row = null
55+
var rowCount = 0
56+
57+
while (baseIterator.hasNext && rowCount < batchSize) {
58+
row = baseIterator.next()
59+
var i = 0
60+
while (i < row.length) {
61+
columnBuilders(i).appendFrom(row, i)
62+
i += 1
63+
}
64+
rowCount += 1
65+
}
66+
67+
columnBuilders.map(_.build())
5868
}
59-
}
6069

61-
Iterator.single(columnBuilders.map(_.build()))
70+
def hasNext = baseIterator.hasNext
71+
}
6272
}.cache()
6373

6474
cached.setName(child.toString)
@@ -74,6 +84,7 @@ private[sql] case class InMemoryRelation(
7484
new InMemoryRelation(
7585
output.map(_.newInstance),
7686
useCompression,
87+
batchSize,
7788
child)(
7889
_cachedColumnBuffers).asInstanceOf[this.type]
7990
}
@@ -90,22 +101,31 @@ private[sql] case class InMemoryColumnarTableScan(
90101

91102
override def execute() = {
92103
relation.cachedColumnBuffers.mapPartitions { iterator =>
93-
val columnBuffers = iterator.next()
94-
assert(!iterator.hasNext)
104+
// Find the ordinals of the requested columns. If none are requested, use the first.
105+
val requestedColumns =
106+
if (attributes.isEmpty) {
107+
Seq(0)
108+
} else {
109+
attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
110+
}
95111

96112
new Iterator[Row] {
97-
// Find the ordinals of the requested columns. If none are requested, use the first.
98-
val requestedColumns =
99-
if (attributes.isEmpty) {
100-
Seq(0)
101-
} else {
102-
attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
103-
}
113+
private[this] var columnBuffers: Array[ByteBuffer] = null
114+
private[this] var columnAccessors: Seq[ColumnAccessor] = null
115+
nextBatch()
116+
117+
private[this] val nextRow = new GenericMutableRow(columnAccessors.length)
104118

105-
val columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
106-
val nextRow = new GenericMutableRow(columnAccessors.length)
119+
def nextBatch() = {
120+
columnBuffers = iterator.next()
121+
columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
122+
}
107123

108124
override def next() = {
125+
if (!columnAccessors.head.hasNext) {
126+
nextBatch()
127+
}
128+
109129
var i = 0
110130
while (i < nextRow.length) {
111131
columnAccessors(i).extractTo(nextRow, i)
@@ -114,7 +134,7 @@ private[sql] case class InMemoryColumnarTableScan(
114134
nextRow
115135
}
116136

117-
override def hasNext = columnAccessors.head.hasNext
137+
override def hasNext = columnAccessors.head.hasNext || iterator.hasNext
118138
}
119139
}
120140
}

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,19 @@ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableSca
2222
import org.apache.spark.sql.test.TestSQLContext
2323
import org.apache.spark.sql.test.TestSQLContext._
2424

25+
case class BigData(s: String)
26+
2527
class CachedTableSuite extends QueryTest {
2628
TestData // Load test tables.
2729

30+
test("too big for memory") {
31+
val data = "*" * 10000
32+
sparkContext.parallelize(1 to 1000000, 1).map(_ => BigData(data)).registerTempTable("bigData")
33+
cacheTable("bigData")
34+
assert(table("bigData").count() === 1000000L)
35+
uncacheTable("bigData")
36+
}
37+
2838
test("SPARK-1669: cacheTable should be idempotent") {
2939
assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
3040

@@ -37,7 +47,7 @@ class CachedTableSuite extends QueryTest {
3747

3848
cacheTable("testData")
3949
table("testData").queryExecution.analyzed match {
40-
case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
50+
case InMemoryRelation(_, _, _, _: InMemoryColumnarTableScan) =>
4151
fail("cacheTable is not idempotent")
4252

4353
case _ =>

sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
2828

2929
test("simple columnar query") {
3030
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
31-
val scan = InMemoryRelation(useCompression = true, plan)
31+
val scan = InMemoryRelation(useCompression = true, 5, plan)
3232

3333
checkAnswer(scan, testData.collect().toSeq)
3434
}
3535

3636
test("projection") {
3737
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
38-
val scan = InMemoryRelation(useCompression = true, plan)
38+
val scan = InMemoryRelation(useCompression = true, 5, plan)
3939

4040
checkAnswer(scan, testData.collect().map {
4141
case Row(key: Int, value: String) => value -> key
@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
4444

4545
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
4646
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
47-
val scan = InMemoryRelation(useCompression = true, plan)
47+
val scan = InMemoryRelation(useCompression = true, 5, plan)
4848

4949
checkAnswer(scan, testData.collect().toSeq)
5050
checkAnswer(scan, testData.collect().toSeq)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
137137
castChildOutput(p, table, child)
138138

139139
case p @ logical.InsertIntoTable(
140-
InMemoryRelation(_, _,
140+
InMemoryRelation(_, _, _,
141141
HiveTableScan(_, table, _)), _, child, _) =>
142142
castChildOutput(p, table, child)
143143
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ private[hive] trait HiveStrategies {
4545
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
4646
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
4747
case logical.InsertIntoTable(
48-
InMemoryRelation(_, _,
48+
InMemoryRelation(_, _, _,
4949
HiveTableScan(_, table, _)), partition, child, overwrite) =>
5050
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
5151
case _ => Nil

0 commit comments

Comments
 (0)