Skip to content

Commit e048111

Browse files
chenghao-intelAndrew Or
authored andcommitted
[SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill
Data Spill with UnsafeRow causes assert failure. ``` java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ``` To reproduce that with code (thanks andrewor14): ```scala bin/spark-shell --master local --conf spark.shuffle.memoryFraction=0.005 --conf spark.shuffle.sort.bypassMergeThreshold=0 sc.parallelize(1 to 2 * 1000 * 1000, 10) .map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count() ``` Author: Cheng Hao <[email protected]> Closes apache#8635 from chenghao-intel/unsafe_spill.
1 parent 49da38e commit e048111

File tree

3 files changed

+67
-5
lines changed

3 files changed

+67
-5
lines changed

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,12 @@ private[spark] class ExternalSorter[K, V, C](
188188

189189
private val spills = new ArrayBuffer[SpilledFile]
190190

191+
/**
192+
* Number of files this sorter has spilled so far.
193+
* Exposed for testing.
194+
*/
195+
private[spark] def numSpills: Int = spills.size
196+
191197
override def insertAll(records: Iterator[Product2[K, V]]): Unit = {
192198
// TODO: stop combining if we find that the reduction factor isn't high
193199
val shouldCombine = aggregator.isDefined

sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
7272
override def writeKey[T: ClassTag](key: T): SerializationStream = {
7373
// The key is only needed on the map side when computing partition ids. It does not need to
7474
// be shuffled.
75-
assert(key.isInstanceOf[Int])
75+
assert(null == key || key.isInstanceOf[Int])
7676
this
7777
}
7878

sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import java.io.{DataOutputStream, ByteArrayInputStream, ByteArrayOutputStream}
20+
import java.io.{File, DataOutputStream, ByteArrayInputStream, ByteArrayOutputStream}
2121

22-
import org.apache.spark.SparkFunSuite
22+
import org.apache.spark.executor.ShuffleWriteMetrics
23+
import org.apache.spark.storage.ShuffleBlockId
24+
import org.apache.spark.util.collection.ExternalSorter
25+
import org.apache.spark.util.Utils
2326
import org.apache.spark.sql.Row
2427
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
2528
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
2629
import org.apache.spark.sql.types._
30+
import org.apache.spark._
2731

2832

2933
/**
@@ -40,9 +44,15 @@ class ClosableByteArrayInputStream(buf: Array[Byte]) extends ByteArrayInputStrea
4044
class UnsafeRowSerializerSuite extends SparkFunSuite {
4145

4246
private def toUnsafeRow(row: Row, schema: Array[DataType]): UnsafeRow = {
43-
val internalRow = CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[InternalRow]
47+
val converter = unsafeRowConverter(schema)
48+
converter(row)
49+
}
50+
51+
private def unsafeRowConverter(schema: Array[DataType]): Row => UnsafeRow = {
4452
val converter = UnsafeProjection.create(schema)
45-
converter.apply(internalRow)
53+
(row: Row) => {
54+
converter(CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[InternalRow])
55+
}
4656
}
4757

4858
test("toUnsafeRow() test helper method") {
@@ -87,4 +97,50 @@ class UnsafeRowSerializerSuite extends SparkFunSuite {
8797
assert(!deserializerIter.hasNext)
8898
assert(input.closed)
8999
}
100+
101+
test("SPARK-10466: external sorter spilling with unsafe row serializer") {
102+
var sc: SparkContext = null
103+
var outputFile: File = null
104+
val oldEnv = SparkEnv.get // save the old SparkEnv, as it will be overwritten
105+
Utils.tryWithSafeFinally {
106+
val conf = new SparkConf()
107+
.set("spark.shuffle.spill.initialMemoryThreshold", "1024")
108+
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
109+
.set("spark.shuffle.memoryFraction", "0.0001")
110+
111+
sc = new SparkContext("local", "test", conf)
112+
outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
113+
// prepare data
114+
val converter = unsafeRowConverter(Array(IntegerType))
115+
val data = (1 to 1000).iterator.map { i =>
116+
(i, converter(Row(i)))
117+
}
118+
val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
119+
partitioner = Some(new HashPartitioner(10)),
120+
serializer = Some(new UnsafeRowSerializer(numFields = 1)))
121+
122+
// Ensure we spilled something and have to merge them later
123+
assert(sorter.numSpills === 0)
124+
sorter.insertAll(data)
125+
assert(sorter.numSpills > 0)
126+
127+
// Merging spilled files should not throw assertion error
128+
val taskContext =
129+
new TaskContextImpl(0, 0, 0, 0, null, null, InternalAccumulator.create(sc))
130+
taskContext.taskMetrics.shuffleWriteMetrics = Some(new ShuffleWriteMetrics)
131+
sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), taskContext, outputFile)
132+
} {
133+
// Clean up
134+
if (sc != null) {
135+
sc.stop()
136+
}
137+
138+
// restore the spark env
139+
SparkEnv.set(oldEnv)
140+
141+
if (outputFile != null) {
142+
outputFile.delete()
143+
}
144+
}
145+
}
90146
}

0 commit comments

Comments
 (0)