From 2afaf3599a71f13f52f4cb28b307f93dfe5db61f Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 3 Dec 2014 01:59:00 -0800 Subject: [PATCH 1/3] [SPARK-4085] Propagate FetchFailedException when Spark fails to read local shuffle file. --- .../storage/ShuffleBlockFetcherIterator.scala | 31 +++++++++----- .../spark/ShuffleFaultToleranceSuite.scala | 40 +++++++++++++++++++ 2 files changed, 60 insertions(+), 11 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/ShuffleFaultToleranceSuite.scala diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 83170f7c5a4ab..48f2dc1cb1cff 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -17,6 +17,7 @@ package org.apache.spark.storage +import java.io.{InputStream, IOException} import java.util.concurrent.LinkedBlockingQueue import scala.collection.mutable.{ArrayBuffer, HashSet, Queue} @@ -289,17 +290,25 @@ final class ShuffleBlockFetcherIterator( } val iteratorTry: Try[Iterator[Any]] = result match { - case FailureFetchResult(_, e) => Failure(e) - case SuccessFetchResult(blockId, _, buf) => { - val is = blockManager.wrapForCompression(blockId, buf.createInputStream()) - val iter = serializer.newInstance().deserializeStream(is).asIterator - Success(CompletionIterator[Any, Iterator[Any]](iter, { - // Once the iterator is exhausted, release the buffer and set currentResult to null - // so we don't release it again in cleanup. - currentResult = null - buf.release() - })) - } + case FailureFetchResult(_, e) => + Failure(e) + case SuccessFetchResult(blockId, _, buf) => + // There is a chance that createInputStream can fail (e.g. fetching a local file that does + // not exist, SPARK-4085). In that case, we should propagate the right exception so + // the scheduler gets a FetchFailedException. + Try(buf.createInputStream()) match { + case Success(is0) => + val is = blockManager.wrapForCompression(blockId, is0) + val iter = serializer.newInstance().deserializeStream(is).asIterator + Success(CompletionIterator[Any, Iterator[Any]](iter, { + // Once the iterator is exhausted, release the buffer and set currentResult to null + // so we don't release it again in cleanup. + currentResult = null + buf.release() + })) + case Failure(e) => + Failure(e) + } } (result.blockId, iteratorTry) diff --git a/core/src/test/scala/org/apache/spark/ShuffleFaultToleranceSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleFaultToleranceSuite.scala new file mode 100644 index 0000000000000..217acf0b733c0 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ShuffleFaultToleranceSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.scalatest.FunSuite + +import org.apache.spark.storage.ShuffleBlockId + + +class ShuffleFaultToleranceSuite extends FunSuite { + + test("[SPARK-4085] hash shuffle manager recovers when local shuffle files get deleted") { + val conf = new SparkConf(false) + conf.set("spark.shuffle.manager", "hash") + val sc = new SparkContext("local", "test", conf) + val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _) + rdd.count() + + // Delete one of the local shuffle blocks. + sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0)).delete() + rdd.count() + + sc.stop() + } +} From f9814d929170583163bbc53acc7db954f43209ba Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 3 Dec 2014 13:55:46 -0800 Subject: [PATCH 2/3] Code review feedback. --- .../storage/ShuffleBlockFetcherIterator.scala | 21 ++++++++----------- .../spark/ShuffleFaultToleranceSuite.scala | 12 ++++++----- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 48f2dc1cb1cff..2499c11a65b0e 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -296,18 +296,15 @@ final class ShuffleBlockFetcherIterator( // There is a chance that createInputStream can fail (e.g. fetching a local file that does // not exist, SPARK-4085). In that case, we should propagate the right exception so // the scheduler gets a FetchFailedException. - Try(buf.createInputStream()) match { - case Success(is0) => - val is = blockManager.wrapForCompression(blockId, is0) - val iter = serializer.newInstance().deserializeStream(is).asIterator - Success(CompletionIterator[Any, Iterator[Any]](iter, { - // Once the iterator is exhausted, release the buffer and set currentResult to null - // so we don't release it again in cleanup. - currentResult = null - buf.release() - })) - case Failure(e) => - Failure(e) + Try(buf.createInputStream()).map { is0 => + val is = blockManager.wrapForCompression(blockId, is0) + val iter = serializer.newInstance().deserializeStream(is).asIterator + CompletionIterator[Any, Iterator[Any]](iter, { + // Once the iterator is exhausted, release the buffer and set currentResult to null + // so we don't release it again in cleanup. + currentResult = null + buf.release() + }) } } diff --git a/core/src/test/scala/org/apache/spark/ShuffleFaultToleranceSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleFaultToleranceSuite.scala index 217acf0b733c0..48c9dbbaadd9b 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleFaultToleranceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleFaultToleranceSuite.scala @@ -22,19 +22,21 @@ import org.scalatest.FunSuite import org.apache.spark.storage.ShuffleBlockId -class ShuffleFaultToleranceSuite extends FunSuite { +class ShuffleFaultToleranceSuite extends FunSuite with LocalSparkContext { test("[SPARK-4085] hash shuffle manager recovers when local shuffle files get deleted") { val conf = new SparkConf(false) conf.set("spark.shuffle.manager", "hash") - val sc = new SparkContext("local", "test", conf) + sc = new SparkContext("local", "test", conf) val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _) rdd.count() // Delete one of the local shuffle blocks. - sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0)).delete() - rdd.count() + val shuffleFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0)) + assert(shuffleFile.exists()) + shuffleFile.delete() - sc.stop() + // This count should retry the execution of the previous stage and rerun shuffle. + rdd.count() } } From 255b4fd8faf241244591a21273ea1e2e3d97b5c9 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 3 Dec 2014 15:04:50 -0800 Subject: [PATCH 3/3] Updated test. --- .../spark/ExternalShuffleServiceSuite.scala | 2 - .../spark/ShuffleFaultToleranceSuite.scala | 42 ------------------- .../scala/org/apache/spark/ShuffleSuite.scala | 23 ++++++++++ 3 files changed, 23 insertions(+), 44 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/ShuffleFaultToleranceSuite.scala diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index cc3592ee43a35..bac6fdbcdc976 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark -import java.util.concurrent.atomic.AtomicInteger - import org.scalatest.BeforeAndAfterAll import org.apache.spark.network.TransportContext diff --git a/core/src/test/scala/org/apache/spark/ShuffleFaultToleranceSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleFaultToleranceSuite.scala deleted file mode 100644 index 48c9dbbaadd9b..0000000000000 --- a/core/src/test/scala/org/apache/spark/ShuffleFaultToleranceSuite.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import org.scalatest.FunSuite - -import org.apache.spark.storage.ShuffleBlockId - - -class ShuffleFaultToleranceSuite extends FunSuite with LocalSparkContext { - - test("[SPARK-4085] hash shuffle manager recovers when local shuffle files get deleted") { - val conf = new SparkConf(false) - conf.set("spark.shuffle.manager", "hash") - sc = new SparkContext("local", "test", conf) - val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _) - rdd.count() - - // Delete one of the local shuffle blocks. - val shuffleFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0)) - assert(shuffleFile.exists()) - shuffleFile.delete() - - // This count should retry the execution of the previous stage and rerun shuffle. - rdd.count() - } -} diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 5a133c0490444..58a96245a9b53 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.Matchers import org.apache.spark.ShuffleSuite.NonJavaSerializableClass import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD} import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.storage.{ShuffleDataBlockId, ShuffleBlockId} import org.apache.spark.util.MutablePair abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { @@ -263,6 +264,28 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex } } } + + test("[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file") { + val myConf = conf.clone().set("spark.test.noStageRetry", "false") + sc = new SparkContext("local", "test", myConf) + val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _) + rdd.count() + + // Delete one of the local shuffle blocks. + val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0)) + val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0)) + assert(hashFile.exists() || sortFile.exists()) + + if (hashFile.exists()) { + hashFile.delete() + } + if (sortFile.exists()) { + sortFile.delete() + } + + // This count should retry the execution of the previous stage and rerun shuffle. + rdd.count() + } } object ShuffleSuite {