Skip to content

Commit fe28ee2

Browse files
rxinpwendell
authored andcommitted
[SPARK-4085] Propagate FetchFailedException when Spark fails to read local shuffle file.
cc aarondav kayousterhout pwendell This should go into 1.2? Author: Reynold Xin <[email protected]> Closes #3579 from rxin/SPARK-4085 and squashes the following commits: 255b4fd [Reynold Xin] Updated test. f9814d9 [Reynold Xin] Code review feedback. 2afaf35 [Reynold Xin] [SPARK-4085] Propagate FetchFailedException when Spark fails to read local shuffle file. (cherry picked from commit 1826372) Signed-off-by: Patrick Wendell <[email protected]>
1 parent 6b6b779 commit fe28ee2

File tree

3 files changed

+40
-13
lines changed

3 files changed

+40
-13
lines changed

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.storage
1919

20+
import java.io.{InputStream, IOException}
2021
import java.util.concurrent.LinkedBlockingQueue
2122

2223
import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
@@ -289,17 +290,22 @@ final class ShuffleBlockFetcherIterator(
289290
}
290291

291292
val iteratorTry: Try[Iterator[Any]] = result match {
292-
case FailureFetchResult(_, e) => Failure(e)
293-
case SuccessFetchResult(blockId, _, buf) => {
294-
val is = blockManager.wrapForCompression(blockId, buf.createInputStream())
295-
val iter = serializer.newInstance().deserializeStream(is).asIterator
296-
Success(CompletionIterator[Any, Iterator[Any]](iter, {
297-
// Once the iterator is exhausted, release the buffer and set currentResult to null
298-
// so we don't release it again in cleanup.
299-
currentResult = null
300-
buf.release()
301-
}))
302-
}
293+
case FailureFetchResult(_, e) =>
294+
Failure(e)
295+
case SuccessFetchResult(blockId, _, buf) =>
296+
// There is a chance that createInputStream can fail (e.g. fetching a local file that does
297+
// not exist, SPARK-4085). In that case, we should propagate the right exception so
298+
// the scheduler gets a FetchFailedException.
299+
Try(buf.createInputStream()).map { is0 =>
300+
val is = blockManager.wrapForCompression(blockId, is0)
301+
val iter = serializer.newInstance().deserializeStream(is).asIterator
302+
CompletionIterator[Any, Iterator[Any]](iter, {
303+
// Once the iterator is exhausted, release the buffer and set currentResult to null
304+
// so we don't release it again in cleanup.
305+
currentResult = null
306+
buf.release()
307+
})
308+
}
303309
}
304310

305311
(result.blockId, iteratorTry)

core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark
1919

20-
import java.util.concurrent.atomic.AtomicInteger
21-
2220
import org.scalatest.BeforeAndAfterAll
2321

2422
import org.apache.spark.SparkContext._

core/src/test/scala/org/apache/spark/ShuffleSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._
2424
import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
2525
import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
2626
import org.apache.spark.serializer.KryoSerializer
27+
import org.apache.spark.storage.{ShuffleDataBlockId, ShuffleBlockId}
2728
import org.apache.spark.util.MutablePair
2829

2930
abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
@@ -264,6 +265,28 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
264265
}
265266
}
266267
}
268+
269+
test("[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file") {
270+
val myConf = conf.clone().set("spark.test.noStageRetry", "false")
271+
sc = new SparkContext("local", "test", myConf)
272+
val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
273+
rdd.count()
274+
275+
// Delete one of the local shuffle blocks.
276+
val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
277+
val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
278+
assert(hashFile.exists() || sortFile.exists())
279+
280+
if (hashFile.exists()) {
281+
hashFile.delete()
282+
}
283+
if (sortFile.exists()) {
284+
sortFile.delete()
285+
}
286+
287+
// This count should retry the execution of the previous stage and rerun shuffle.
288+
rdd.count()
289+
}
267290
}
268291

269292
object ShuffleSuite {

0 commit comments

Comments
 (0)