Skip to content

Commit 9fac6f3

Browse files
author
Andrew Or
committed
Re-implement scopes through annotations instead
The previous "working" implementation frequently ran into NotSerializableExceptions. Why? ClosureCleaner doesn't like closures being wrapped in other closures, and these closures are simply not cleaned (details are intentionally omitted here). This commit reimplements scoping through annotations. All methods that should be scoped are now annotated with @RDDScope. Then, on creation, each RDD derives its scope from the stack trace, similar to how it derives its call site. This is the cleanest approach that bypasses NotSerializableExceptions with least significant limitations.
1 parent f22f337 commit 9fac6f3

File tree

9 files changed

+502
-277
lines changed

9 files changed

+502
-277
lines changed

core/src/main/resources/org/apache/spark/ui/static/viz.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
4444

4545
import org.apache.mesos.MesosNativeLibrary
4646

47-
import org.apache.spark.annotation.{DeveloperApi, Experimental}
47+
import org.apache.spark.annotation.{DeveloperApi, Experimental, RDDScope}
4848
import org.apache.spark.broadcast.Broadcast
4949
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
5050
import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump}
@@ -641,6 +641,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
641641
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
642642
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
643643
*/
644+
@RDDScope
644645
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
645646
assertNotStopped()
646647
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
@@ -650,13 +651,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
650651
*
651652
* This method is identical to `parallelize`.
652653
*/
654+
@RDDScope
653655
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
654656
parallelize(seq, numSlices)
655657
}
656658

657659
/** Distribute a local Scala collection to form an RDD, with one or more
658660
* location preferences (hostnames of Spark nodes) for each object.
659661
* Create a new partition for each collection item. */
662+
@RDDScope
660663
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
661664
assertNotStopped()
662665
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
@@ -667,10 +670,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
667670
* Read a text file from HDFS, a local file system (available on all nodes), or any
668671
* Hadoop-supported file system URI, and return it as an RDD of Strings.
669672
*/
673+
@RDDScope
670674
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
671675
assertNotStopped()
672676
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
673-
minPartitions).map(pair => pair._2.toString).setName(path)
677+
minPartitions).map(pair => pair._2.toString)
674678
}
675679

676680
/**
@@ -700,6 +704,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
700704
*
701705
* @param minPartitions A suggestion value of the minimal splitting number for input data.
702706
*/
707+
@RDDScope
703708
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
704709
RDD[(String, String)] = {
705710
assertNotStopped()
@@ -746,6 +751,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
746751
* @note Small files are preferred; very large files may cause bad performance.
747752
*/
748753
@Experimental
754+
@RDDScope
749755
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
750756
RDD[(String, PortableDataStream)] = {
751757
assertNotStopped()
@@ -774,6 +780,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
774780
* @return An RDD of data with values, represented as byte arrays
775781
*/
776782
@Experimental
783+
@RDDScope
777784
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration)
778785
: RDD[Array[Byte]] = {
779786
assertNotStopped()
@@ -811,6 +818,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
811818
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
812819
* copy them using a `map` function.
813820
*/
821+
@RDDScope
814822
def hadoopRDD[K, V](
815823
conf: JobConf,
816824
inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -832,6 +840,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
832840
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
833841
* copy them using a `map` function.
834842
*/
843+
@RDDScope
835844
def hadoopFile[K, V](
836845
path: String,
837846
inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -850,7 +859,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
850859
inputFormatClass,
851860
keyClass,
852861
valueClass,
853-
minPartitions).setName(path)
862+
minPartitions).setName(s"HadoopRDD[$path]")
854863
}
855864

856865
/**
@@ -867,6 +876,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
867876
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
868877
* copy them using a `map` function.
869878
*/
879+
@RDDScope
870880
def hadoopFile[K, V, F <: InputFormat[K, V]]
871881
(path: String, minPartitions: Int)
872882
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
@@ -891,11 +901,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
891901
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
892902
* copy them using a `map` function.
893903
*/
904+
@RDDScope
894905
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
895906
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
896907
hadoopFile[K, V, F](path, defaultMinPartitions)
897908

898909
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
910+
@RDDScope
899911
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
900912
(path: String)
901913
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
@@ -916,6 +928,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
916928
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
917929
* copy them using a `map` function.
918930
*/
931+
@RDDScope
919932
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
920933
path: String,
921934
fClass: Class[F],
@@ -949,6 +962,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
949962
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
950963
* copy them using a `map` function.
951964
*/
965+
@RDDScope
952966
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
953967
conf: Configuration = hadoopConfiguration,
954968
fClass: Class[F],
@@ -969,6 +983,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
969983
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
970984
* copy them using a `map` function.
971985
*/
986+
@RDDScope
972987
def sequenceFile[K, V](path: String,
973988
keyClass: Class[K],
974989
valueClass: Class[V],
@@ -987,6 +1002,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
9871002
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
9881003
* copy them using a `map` function.
9891004
* */
1005+
@RDDScope
9901006
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = {
9911007
assertNotStopped()
9921008
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
@@ -1014,6 +1030,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10141030
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
10151031
* copy them using a `map` function.
10161032
*/
1033+
@RDDScope
10171034
def sequenceFile[K, V]
10181035
(path: String, minPartitions: Int = defaultMinPartitions)
10191036
(implicit km: ClassTag[K], vm: ClassTag[V],
@@ -1037,6 +1054,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10371054
* though the nice thing about it is that there's very little effort required to save arbitrary
10381055
* objects.
10391056
*/
1057+
@RDDScope
10401058
def objectFile[T: ClassTag](
10411059
path: String,
10421060
minPartitions: Int = defaultMinPartitions
@@ -1046,13 +1064,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10461064
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader))
10471065
}
10481066

1067+
@RDDScope
10491068
protected[spark] def checkpointFile[T: ClassTag](
10501069
path: String
10511070
): RDD[T] = {
10521071
new CheckpointRDD[T](this, path)
10531072
}
10541073

10551074
/** Build the union of a list of RDDs. */
1075+
@RDDScope
10561076
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
10571077
val partitioners = rdds.flatMap(_.partitioner).toSet
10581078
if (partitioners.size == 1) {
@@ -1063,6 +1083,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10631083
}
10641084

10651085
/** Build the union of a list of RDDs passed as variable-length arguments. */
1086+
@RDDScope
10661087
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
10671088
union(Seq(first) ++ rest)
10681089

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.annotation;
19+
20+
import java.lang.annotation.*;
21+
22+
/**
23+
* Blah blah blah blah blah.
24+
* This should really be private and not displayed on the docs.
25+
*/
26+
@Retention(RetentionPolicy.RUNTIME)
27+
@Target({ElementType.METHOD})
28+
public @interface RDDScope {}

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
2424
import scala.reflect.ClassTag
2525

2626
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
27+
import org.apache.spark.annotation.RDDScope
2728

2829
/**
2930
* A set of asynchronous RDD actions available through an implicit conversion.
@@ -33,6 +34,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
3334
/**
3435
* Returns a future for counting the number of elements in the RDD.
3536
*/
37+
@RDDScope
3638
def countAsync(): FutureAction[Long] = {
3739
val totalCount = new AtomicLong
3840
self.context.submitJob(
@@ -53,6 +55,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
5355
/**
5456
* Returns a future for retrieving all elements of this RDD.
5557
*/
58+
@RDDScope
5659
def collectAsync(): FutureAction[Seq[T]] = {
5760
val results = new Array[Array[T]](self.partitions.length)
5861
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length),
@@ -62,6 +65,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
6265
/**
6366
* Returns a future for retrieving the first num elements of the RDD.
6467
*/
68+
@RDDScope
6569
def takeAsync(num: Int): FutureAction[Seq[T]] = {
6670
val f = new ComplexFutureAction[Seq[T]]
6771

@@ -109,6 +113,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
109113
/**
110114
* Applies a function f to all elements of this RDD.
111115
*/
116+
@RDDScope
112117
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
113118
val cleanF = self.context.clean(f)
114119
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length),
@@ -118,6 +123,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
118123
/**
119124
* Applies a function f to each partition of this RDD.
120125
*/
126+
@RDDScope
121127
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
122128
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
123129
(index, data) => Unit, Unit)

0 commit comments

Comments
 (0)