Skip to content

[SPARK-10466][SQL] UnsafeRow SerDe exception with data spill #8635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

chenghao-intel
Copy link
Contributor

Data Spill with UnsafeRow causes assert failure.

java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:165)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180)
    at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688)
    at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687)
    at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

To reproduce that with code (thanks @andrewor14):

bin/spark-shell --master local
  --conf spark.shuffle.memoryFraction=0.005
  --conf spark.shuffle.sort.bypassMergeThreshold=0

sc.parallelize(1 to 2 * 1000 * 1000, 10)
  .map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count()

@@ -72,7 +72,6 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
override def writeKey[T: ClassTag](key: T): SerializationStream = {
// The key is only needed on the map side when computing partition ids. It does not need to
// be shuffled.
assert(key.isInstanceOf[Int])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't the right thing to do here be to allow nulls as well? In general it's a bad idea to remove assertions

assert(key == null || key.isInstanceOf[Int])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change the dummy value to a number (-1) instead of null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I like that better. We can't have a partition ID of -1, whereas null.asInstanceOf[Int] may be confused with the partition ID of 0

@chenghao-intel
Copy link
Contributor Author

cc @rxin

@SparkQA
Copy link

SparkQA commented Sep 7, 2015

Test build #42079 has finished for PR 8635 at commit 684cdb6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 7, 2015

Test build #42090 has finished for PR 8635 at commit 229ce8a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Sep 8, 2015

@chenghao-intel Can you add a unit test?

@andrewor14
Copy link
Contributor

By the way, I was able to come up with a smaller reproduction:

bin/spark-shell --master local
  --conf spark.shuffle.memoryFraction=0.005
  --conf spark.shuffle.sort.bypassMergeThreshold=0

sc.parallelize(1 to 2 * 1000 * 1000, 10)
  .map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count()

@chenghao-intel
Copy link
Contributor Author

yes, that's more simple for unit test, I will steal it. :)


import org.apache.spark.{SparkFunSuite, SparkContext, SparkConf}

class MiniSparkSQLClusterSuite extends SparkFunSuite {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we don't necessary to create a special unit test for the bug fixing, however, there are some other issues, which probably requires re-creating the SparkContext with different SparkConf.
For example: https://issues.apache.org/jira/browse/SPARK-10474

@andrewor14
Copy link
Contributor

@chenghao-intel thanks for adding the test. When I posted the code reproduction it wasn't meant as unit test code, but for those following this issue to reproduce it. Given that we understand the root cause of this issue I would prefer to have a finer-grained test that doesn't rely on thresholds.

@chenghao-intel
Copy link
Contributor Author

Thank you @andrewor14, I agree, it's too tricky with unit test like that, i will follow your idea to re-write the unit test.

@SparkQA
Copy link

SparkQA commented Sep 9, 2015

Test build #42178 has finished for PR 8635 at commit c47c53c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Make sure it spilled
assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0)

assert(sorter.writePartitionedFile(shuffleBlockId, taskContext, outputFile).sum > 0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception will be thrown here if we didn't change the UnsafeRowSerializer as above.

@chenghao-intel
Copy link
Contributor Author

@andrewor14 seems very difficult to have a very simple unit test, as ExternalSorter have to work with lots of other components, hence I added some mock stuff.

Those mock stuff should be helpful, as I found some other interesting bug, and I can continue to fix it once this merged.

@SparkQA
Copy link

SparkQA commented Sep 9, 2015

Test build #42202 has finished for PR 8635 at commit 7f09a62.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Sep 9, 2015

LGTM

@andrewor14
Copy link
Contributor

@chenghao-intel thanks for taking the time to write the test. However I think it is much more complicated than necessary. I was able to add the same test to the existing UnsafeRowSerializerSuite in ~50 lines without all the mocking. Can you use this one instead?

test("SPARK-10466: external sorter spilling with unsafe row serializer") {
  val conf = new SparkConf()
    .set("spark.shuffle.spill.initialMemoryThreshold", "1024")
    .set("spark.shuffle.sort.bypassMergeThreshold", "0")
    .set("spark.shuffle.memoryFraction", "0.0001")
  var sc: SparkContext = null
  var outputFile: File = null
  try {
    sc = new SparkContext("local", "test", conf)
    outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
    val data = (1 to 1000).iterator.map { i =>
      val internalRow = CatalystTypeConverters.convertToCatalyst(Row(i)).asInstanceOf[InternalRow]
      val unsafeRow = UnsafeProjection.create(Array(IntegerType: DataType)).apply(internalRow)
      (i, unsafeRow)
    }
    val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
      partitioner = Some(new HashPartitioner(10)),
      serializer = Some(new UnsafeRowSerializer(numFields = 2)))

    // Ensure we spilled something and have to merge them later
    assert(sorter.numSpills === 0)
    sorter.insertAll(data)
    assert(sorter.numSpills > 0)

    // Merging spilled files should not throw assertion error
    val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, InternalAccumulator.create(sc))
    taskContext.taskMetrics.shuffleWriteMetrics = Some(new ShuffleWriteMetrics)
    sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), taskContext, outputFile)

  } finally {
    // Clean up
    if (sc != null) {
      sc.stop()
    }
    if (outputFile != null) {
      outputFile.delete()
    }
  }
}
// In ExternalSorter.scala:
/**
 * Number of files this sorter has spilled so far.
 * Exposed for testing.
 */
private[spark] def numSpills: Int = spills.size

@chenghao-intel
Copy link
Contributor Author

Thank you @andrewor14 your code is much simple, I took it already. :)

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #1734 has finished for PR 8635 at commit b8dd7eb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #1733 has finished for PR 8635 at commit b8dd7eb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BlockFetchException(messages: String, throwable: Throwable)

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42235 has finished for PR 8635 at commit 68ff3d3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@chenghao-intel
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42236 has finished for PR 8635 at commit e8b27b5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42241 has finished for PR 8635 at commit e8b27b5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@chenghao-intel
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42263 has finished for PR 8635 at commit e8b27b5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

The latest commit actually already passed tests:

Test build #42236 has finished for PR 8635 at commit e8b27b5.
This patch passes all tests.

LGTM I'm merging this into master 1.5. Thanks @chenghao-intel.

converter(row)
}

private def unsafeRowConverter(schema: Array[DataType]): Row => UnsafeRow = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method seems strictly unnecessary... we can just remove it in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually UnsafeProjection.create(schema) will do the codegen stuff, and this causes long time if we have to generate the large mount of UnsafeRows.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we can just inline it in toUnsafeRow. There's no reason why it needs to be its own method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I got your mean, if we inline that in toUnsafeRow, then for every call of toUnsafeRow, we will get a new instance of Converter according to the schema, this is actually very expensive, as it's codegen internally for creating the converter instance.

Probably we'd better to remove the function toUnsafeRow in the future, since it's always cause performance problem, and people even not notice that.

asfgit pushed a commit that referenced this pull request Sep 10, 2015
Data Spill with UnsafeRow causes assert failure.

```
java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:165)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
```

To reproduce that with code (thanks andrewor14):
```scala
bin/spark-shell --master local
  --conf spark.shuffle.memoryFraction=0.005
  --conf spark.shuffle.sort.bypassMergeThreshold=0

sc.parallelize(1 to 2 * 1000 * 1000, 10)
  .map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count()
```

Author: Cheng Hao <[email protected]>

Closes #8635 from chenghao-intel/unsafe_spill.

(cherry picked from commit e048111)
Signed-off-by: Andrew Or <[email protected]>
@asfgit asfgit closed this in e048111 Sep 10, 2015
@chenghao-intel chenghao-intel deleted the unsafe_spill branch September 11, 2015 01:16
ashangit pushed a commit to ashangit/spark that referenced this pull request Oct 19, 2016
Data Spill with UnsafeRow causes assert failure.

```
java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:165)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
```

To reproduce that with code (thanks andrewor14):
```scala
bin/spark-shell --master local
  --conf spark.shuffle.memoryFraction=0.005
  --conf spark.shuffle.sort.bypassMergeThreshold=0

sc.parallelize(1 to 2 * 1000 * 1000, 10)
  .map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count()
```

Author: Cheng Hao <[email protected]>

Closes apache#8635 from chenghao-intel/unsafe_spill.

(cherry picked from commit e048111)
Signed-off-by: Andrew Or <[email protected]>
(cherry picked from commit bc70043)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants