From b8ff8cb5ebf9ac575d1f7ef6bf9a7c4784b8a5c8 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 27 Feb 2014 08:01:23 +0000 Subject: [PATCH 1/6] Replace deprecated Ant with --- core/pom.xml | 4 ++-- repl/pom.xml | 4 ++-- yarn/pom.xml | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index ebc178a10541a..a333bff28c246 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -225,7 +225,7 @@ true - + @@ -238,7 +238,7 @@ - + diff --git a/repl/pom.xml b/repl/pom.xml index 73597f635b9e0..4c5f9720c802a 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -98,7 +98,7 @@ true - + @@ -111,7 +111,7 @@ - + diff --git a/yarn/pom.xml b/yarn/pom.xml index e7eba36ba351b..c0e133dd603b1 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -133,7 +133,7 @@ true - + @@ -146,7 +146,7 @@ - + From 007762ba2ffa9c3b396de95e64652935a07e74e0 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 27 Feb 2014 08:02:13 +0000 Subject: [PATCH 2/6] Remove dead scaladoc links --- .../scala/org/apache/spark/bagel/Bagel.scala | 16 ++++++++-------- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/util/IndestructibleActorSystem.scala | 4 ++-- .../org/apache/spark/util/StatCounter.scala | 3 +-- .../scala/org/apache/spark/util/Vector.scala | 2 +- .../streaming/api/java/JavaPairDStream.scala | 16 ++++++++-------- .../api/java/JavaStreamingContext.scala | 4 ++-- .../streaming/dstream/PairDStreamFunctions.scala | 16 ++++++++-------- 8 files changed, 31 insertions(+), 32 deletions(-) diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index dd3eed8affe39..8c60fd9aa88cb 100644 --- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -27,7 +27,7 @@ object Bagel extends Logging { /** * Runs a Bagel program. - * @param sc [[org.apache.spark.SparkContext]] to use for the program. + * @param sc org.apache.spark.SparkContext to use for the program. * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the * Key will be the vertex id. * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often @@ -38,12 +38,12 @@ object Bagel extends Logging { * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices * after each superstep and provides the result to each vertex in the next * superstep. - * @param partitioner [[org.apache.spark.Partitioner]] partitions values by key + * @param partitioner org.apache.spark.Partitioner partitions values by key * @param numPartitions number of partitions across which to split the graph. * Default is the default parallelism of the SparkContext - * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of + * @param storageLevel org.apache.spark.storage.StorageLevel to use for caching of * intermediate RDDs in each superstep. Defaults to caching in memory. - * @param compute function that takes a Vertex, optional set of (possibly combined) messages to + *@param compute function that takes a Vertex, optional set of (possibly combined) messages to * the Vertex, optional Aggregator and the current superstep, * and returns a set of (Vertex, outgoing Messages) pairs * @tparam K key @@ -131,7 +131,7 @@ object Bagel extends Logging { /** * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default - * [[org.apache.spark.HashPartitioner]] and default storage level + * org.apache.spark.HashPartitioner and default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, @@ -146,7 +146,7 @@ object Bagel extends Logging { /** * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the - * default [[org.apache.spark.HashPartitioner]] + * default org.apache.spark.HashPartitioner */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, @@ -166,7 +166,7 @@ object Bagel extends Logging { /** * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], - * default [[org.apache.spark.HashPartitioner]], + * default org.apache.spark.HashPartitioner, * [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( @@ -180,7 +180,7 @@ object Bagel extends Logging { /** * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], - * the default [[org.apache.spark.HashPartitioner]] + * the default org.apache.spark.HashPartitioner * and [[org.apache.spark.bagel.DefaultCombiner]] */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1f5334f3dbb40..da778aa851cd2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -351,7 +351,7 @@ class SparkContext( * using the older MapReduce API (`org.apache.hadoop.mapred`). * * @param conf JobConf for setting up the dataset - * @param inputFormatClass Class of the [[InputFormat]] + * @param inputFormatClass Class of the InputFormat * @param keyClass Class of the keys * @param valueClass Class of the values * @param minSplits Minimum number of Hadoop Splits to generate. diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala index bf71882ef770a..c539d2f708f95 100644 --- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala +++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala @@ -23,9 +23,9 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.typesafe.config.Config /** - * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception. + * An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception * This is necessary as Spark Executors are allowed to recover from fatal exceptions - * (see [[org.apache.spark.executor.Executor]]). + * (see org.apache.spark.executor.Executor) */ object IndestructibleActorSystem { def apply(name: String, config: Config): ActorSystem = diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index 5b0d2c36510b8..17e4d1e6f9933 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -20,8 +20,7 @@ package org.apache.spark.util /** * A class for tracking the statistics of a set of numbers (count, mean and variance) in a * numerically robust way. Includes support for merging two StatCounters. Based on - * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance - * Welford and Chan's algorithms for running variance]]. + * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Welford and Chan's algorithms for running variance]]. * * @constructor Initialize the StatCounter with the given values. */ diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala index d437c055f33d4..dc4b8f253f259 100644 --- a/core/src/main/scala/org/apache/spark/util/Vector.scala +++ b/core/src/main/scala/org/apache/spark/util/Vector.scala @@ -136,7 +136,7 @@ object Vector { /** * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers - * between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided. + * between 0.0 and 1.0. Optional scala.util.Random number generator can be provided. */ def random(length: Int, random: Random = new XORShiftRandom()) = Vector(length, _ => random.nextDouble()) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 4dcd0e4c51ec3..2c7ff87744d7a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -127,7 +127,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream. * Therefore, the values for each key in `this` DStream's RDDs are grouped into a - * single sequence to generate the RDDs of the new DStream. [[org.apache.spark.Partitioner]] + * single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner * is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = @@ -151,7 +151,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control * thepartitioning of each RDD. */ def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = { @@ -161,7 +161,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the * combineByKey for RDDs. Please refer to combineByKey in - * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. + * org.apache.spark.rdd.PairRDDFunctions for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -176,7 +176,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the * combineByKey for RDDs. Please refer to combineByKey in - * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. + * org.apache.spark.rdd.PairRDDFunctions for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -479,7 +479,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new @@ -579,7 +579,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def join[W]( other: JavaPairDStream[K, W], @@ -619,7 +619,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def leftOuterJoin[W]( other: JavaPairDStream[K, W], @@ -660,7 +660,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def rightOuterJoin[W]( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 2268160dccc1f..b082bb058529b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -406,7 +406,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). * In the transform function, convert the JavaRDD corresponding to that JavaDStream to - * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD(). */ def transform[T]( dstreams: JList[JavaDStream[_]], @@ -429,7 +429,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). * In the transform function, convert the JavaRDD corresponding to that JavaDStream to - * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD(). */ def transform[K, V]( dstreams: JList[JavaDStream[_]], diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index f3c58aede092a..2473496949360 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -65,7 +65,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying `groupByKey` on each RDD. The supplied - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { val createCombiner = (v: V) => ArrayBuffer[V](v) @@ -95,7 +95,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { @@ -376,7 +376,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new @@ -396,7 +396,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. Note, that * this function may generate a different a tuple with a different key @@ -453,7 +453,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. + * The supplied org.apache.spark.Partitioner is used to partition the generated RDDs. */ def cogroup[W: ClassTag]( other: DStream[(K, W)], @@ -483,7 +483,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def join[W: ClassTag]( other: DStream[(K, W)], @@ -518,7 +518,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def leftOuterJoin[W: ClassTag]( @@ -554,7 +554,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def rightOuterJoin[W: ClassTag]( From 5b2fce25a9620d3e7f36c311c38c44f85c7684a8 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 27 Feb 2014 08:03:41 +0000 Subject: [PATCH 3/6] Fix scaladoc invocation warning, and enable javac warnings properly, with plugin config updates --- pom.xml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 4f1e8398d9b8d..1d97dd3402b09 100644 --- a/pom.xml +++ b/pom.xml @@ -613,12 +613,13 @@ org.apache.maven.plugins maven-compiler-plugin - 2.5.1 + 3.1 ${java.version} ${java.version} UTF-8 1024m + true @@ -633,7 +634,7 @@ org.scalatest scalatest-maven-plugin - 1.0-M2 + 1.0-RC2 ${project.build.directory}/surefire-reports . From 254e8efb811390518298fc5532b9bbeaca66fc77 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 27 Feb 2014 08:04:20 +0000 Subject: [PATCH 4/6] Fix one new style error introduced in scaladoc warning commit --- core/src/main/scala/org/apache/spark/util/StatCounter.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index 17e4d1e6f9933..f837dc7ccc860 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -19,8 +19,9 @@ package org.apache.spark.util /** * A class for tracking the statistics of a set of numbers (count, mean and variance) in a - * numerically robust way. Includes support for merging two StatCounters. Based on - * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Welford and Chan's algorithms for running variance]]. + * numerically robust way. Includes support for merging two StatCounters. Based on Welford + * and Chan's [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance algorithms]] + * for running variance. * * @constructor Initialize the StatCounter with the given values. */ From f35b833041dcbc12b95898b7be2a43c77ec14368 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 27 Feb 2014 08:05:14 +0000 Subject: [PATCH 5/6] Fix two misc javadoc problems --- bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index 8c60fd9aa88cb..70c7474a936dc 100644 --- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -43,7 +43,7 @@ object Bagel extends Logging { * Default is the default parallelism of the SparkContext * @param storageLevel org.apache.spark.storage.StorageLevel to use for caching of * intermediate RDDs in each superstep. Defaults to caching in memory. - *@param compute function that takes a Vertex, optional set of (possibly combined) messages to + * @param compute function that takes a Vertex, optional set of (possibly combined) messages to * the Vertex, optional Aggregator and the current superstep, * and returns a set of (Vertex, outgoing Messages) pairs * @tparam K key diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 9d75d7c4ad69a..006e2a3335428 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -81,7 +81,7 @@ class JobLogger(val user: String, val logDirName: String) /** * Create a log file for one job * @param jobID ID of the job - * @exception FileNotFoundException Fail to create log file + * @throws FileNotFoundException Fail to create log file */ protected def createLogWriter(jobID: Int) { try { From 6c4a32c2e2b9bc57d249bb72fad9fc2adc12cb69 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 27 Feb 2014 08:05:54 +0000 Subject: [PATCH 6/6] Suppress warnings about legitimate unchecked array creations, or change code to avoid it --- .../java/org/apache/spark/JavaAPISuite.java | 35 +++-- .../apache/spark/streaming/JavaAPISuite.java | 124 ++++++++++++------ 2 files changed, 112 insertions(+), 47 deletions(-) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 20232e9fbb8d0..aa5079c159830 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -75,8 +75,9 @@ public int compare(Integer a, Integer b) { else if (a < b) return 1; else return 0; } - }; + } + @SuppressWarnings("unchecked") @Test public void sparkContextUnion() { // Union of non-specialized JavaRDDs @@ -148,6 +149,7 @@ public void call(String s) { Assert.assertEquals(2, foreachCalls); } + @SuppressWarnings("unchecked") @Test public void lookup() { JavaPairRDD categories = sc.parallelizePairs(Arrays.asList( @@ -179,6 +181,7 @@ public Boolean call(Integer x) { Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds } + @SuppressWarnings("unchecked") @Test public void cogroup() { JavaPairRDD categories = sc.parallelizePairs(Arrays.asList( @@ -197,6 +200,7 @@ public void cogroup() { cogrouped.collect(); } + @SuppressWarnings("unchecked") @Test public void leftOuterJoin() { JavaPairRDD rdd1 = sc.parallelizePairs(Arrays.asList( @@ -243,6 +247,7 @@ public Integer call(Integer a, Integer b) { Assert.assertEquals(33, sum); } + @SuppressWarnings("unchecked") @Test public void foldByKey() { List> pairs = Arrays.asList( @@ -265,6 +270,7 @@ public Integer call(Integer a, Integer b) { Assert.assertEquals(3, sums.lookup(3).get(0).intValue()); } + @SuppressWarnings("unchecked") @Test public void reduceByKey() { List> pairs = Arrays.asList( @@ -320,8 +326,8 @@ public void approximateResults() { public void take() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); Assert.assertEquals(1, rdd.first().intValue()); - List firstTwo = rdd.take(2); - List sample = rdd.takeSample(false, 2, 42); + rdd.take(2); + rdd.takeSample(false, 2, 42); } @Test @@ -359,8 +365,8 @@ public Boolean call(Double x) { Assert.assertEquals(2.49444, rdd.stdev(), 0.01); Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01); - Double first = rdd.first(); - List take = rdd.take(5); + rdd.first(); + rdd.take(5); } @Test @@ -438,11 +444,11 @@ public Iterable call(String s) { return lengths; } }); - Double x = doubles.first(); - Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01); + Assert.assertEquals(5.0, doubles.first(), 0.01); Assert.assertEquals(11, pairs.count()); } + @SuppressWarnings("unchecked") @Test public void mapsFromPairsToPairs() { List> pairs = Arrays.asList( @@ -509,6 +515,7 @@ public void repartition() { } } + @SuppressWarnings("unchecked") @Test public void persist() { JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); @@ -573,6 +580,7 @@ public void textFilesCompressed() throws IOException { Assert.assertEquals(expected, readRDD.collect()); } + @SuppressWarnings("unchecked") @Test public void sequenceFile() { File tempDir = Files.createTempDir(); @@ -602,6 +610,7 @@ public Tuple2 call(Tuple2 pair) { Assert.assertEquals(pairs, readRDD.collect()); } + @SuppressWarnings("unchecked") @Test public void writeWithNewAPIHadoopFile() { File tempDir = Files.createTempDir(); @@ -632,6 +641,7 @@ public String call(Tuple2 x) { }).collect().toString()); } + @SuppressWarnings("unchecked") @Test public void readWithNewAPIHadoopFile() throws IOException { File tempDir = Files.createTempDir(); @@ -674,6 +684,7 @@ public void objectFilesOfInts() { Assert.assertEquals(expected, readRDD.collect()); } + @SuppressWarnings("unchecked") @Test public void objectFilesOfComplexTypes() { File tempDir = Files.createTempDir(); @@ -690,6 +701,7 @@ public void objectFilesOfComplexTypes() { Assert.assertEquals(pairs, readRDD.collect()); } + @SuppressWarnings("unchecked") @Test public void hadoopFile() { File tempDir = Files.createTempDir(); @@ -719,6 +731,7 @@ public String call(Tuple2 x) { }).collect().toString()); } + @SuppressWarnings("unchecked") @Test public void hadoopFileCompressed() { File tempDir = Files.createTempDir(); @@ -824,7 +837,7 @@ public Float zero(Float initialValue) { } }; - final Accumulator floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam); + final Accumulator floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); rdd.foreach(new VoidFunction() { public void call(Integer x) { floatAccum.add((float) x); @@ -876,6 +889,7 @@ public void checkpointAndRestore() { Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect()); } + @SuppressWarnings("unchecked") @Test public void mapOnPairRDD() { JavaRDD rdd1 = sc.parallelize(Arrays.asList(1,2,3,4)); @@ -900,6 +914,7 @@ public Tuple2 call(Tuple2 in) throws Excepti } + @SuppressWarnings("unchecked") @Test public void collectPartitions() { JavaRDD rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); @@ -968,7 +983,7 @@ public void countApproxDistinctByKey() { @Test public void collectAsMapWithIntArrayValues() { // Regression test for SPARK-1040 - JavaRDD rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 })); + JavaRDD rdd = sc.parallelize(Arrays.asList(1)); JavaPairRDD pairRDD = rdd.map(new PairFunction() { @Override public Tuple2 call(Integer x) throws Exception { @@ -976,6 +991,6 @@ public Tuple2 call(Integer x) throws Exception { } }); pairRDD.collect(); // Works fine - Map map = pairRDD.collectAsMap(); // Used to crash with ClassCastException + pairRDD.collectAsMap(); // Used to crash with ClassCastException } } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 4fbbce9b8b90e..54a0791d04ea4 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -19,7 +19,6 @@ import scala.Tuple2; -import org.junit.After; import org.junit.Assert; import org.junit.Test; import java.io.*; @@ -30,7 +29,6 @@ import com.google.common.io.Files; import com.google.common.collect.Sets; -import org.apache.spark.SparkConf; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -38,6 +36,7 @@ import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; @@ -45,6 +44,8 @@ // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { + + @SuppressWarnings("unchecked") @Test public void testCount() { List> inputData = Arrays.asList( @@ -64,6 +65,7 @@ public void testCount() { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testMap() { List> inputData = Arrays.asList( @@ -87,6 +89,7 @@ public Integer call(String s) throws Exception { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testWindow() { List> inputData = Arrays.asList( @@ -108,6 +111,7 @@ public void testWindow() { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testWindowWithSlideDuration() { List> inputData = Arrays.asList( @@ -132,6 +136,7 @@ public void testWindowWithSlideDuration() { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testFilter() { List> inputData = Arrays.asList( @@ -155,13 +160,16 @@ public Boolean call(String s) throws Exception { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testRepartitionMorePartitions() { List> inputData = Arrays.asList( Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2); - JavaDStream repartitioned = stream.repartition(4); + JavaDStream stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 2); + JavaDStreamLike,JavaRDD> repartitioned = + stream.repartition(4); JavaTestUtils.attachTestOutputStream(repartitioned); List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); Assert.assertEquals(2, result.size()); @@ -172,13 +180,16 @@ public void testRepartitionMorePartitions() { } } + @SuppressWarnings("unchecked") @Test public void testRepartitionFewerPartitions() { List> inputData = Arrays.asList( Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4); - JavaDStream repartitioned = stream.repartition(2); + JavaDStream stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 4); + JavaDStreamLike,JavaRDD> repartitioned = + stream.repartition(2); JavaTestUtils.attachTestOutputStream(repartitioned); List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); Assert.assertEquals(2, result.size()); @@ -188,6 +199,7 @@ public void testRepartitionFewerPartitions() { } } + @SuppressWarnings("unchecked") @Test public void testGlom() { List> inputData = Arrays.asList( @@ -206,6 +218,7 @@ public void testGlom() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testMapPartitions() { List> inputData = Arrays.asList( @@ -217,16 +230,17 @@ public void testMapPartitions() { Arrays.asList("YANKEESRED SOCKS")); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream mapped = stream.mapPartitions(new FlatMapFunction, String>() { - @Override - public Iterable call(Iterator in) { - String out = ""; - while (in.hasNext()) { - out = out + in.next().toUpperCase(); - } - return Lists.newArrayList(out); - } - }); + JavaDStream mapped = stream.mapPartitions( + new FlatMapFunction, String>() { + @Override + public Iterable call(Iterator in) { + String out = ""; + while (in.hasNext()) { + out = out + in.next().toUpperCase(); + } + return Lists.newArrayList(out); + } + }); JavaTestUtils.attachTestOutputStream(mapped); List> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -247,6 +261,7 @@ public Integer call(Integer i1, Integer i2) throws Exception { } } + @SuppressWarnings("unchecked") @Test public void testReduce() { List> inputData = Arrays.asList( @@ -267,6 +282,7 @@ public void testReduce() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testReduceByWindow() { List> inputData = Arrays.asList( @@ -289,6 +305,7 @@ public void testReduceByWindow() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testQueueStream() { List> expected = Arrays.asList( @@ -312,6 +329,7 @@ public void testQueueStream() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testTransform() { List> inputData = Arrays.asList( @@ -344,6 +362,7 @@ public Integer call(Integer i) throws Exception { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testVariousTransform() { // tests whether all variations of transform can be called from Java @@ -423,6 +442,7 @@ public JavaRDD call(JavaRDD in) throws Exception { } + @SuppressWarnings("unchecked") @Test public void testTransformWith() { List>> stringStringKVStream1 = Arrays.asList( @@ -492,6 +512,7 @@ public JavaPairRDD> call( } + @SuppressWarnings("unchecked") @Test public void testVariousTransformWith() { // tests whether all variations of transformWith can be called from Java @@ -591,6 +612,7 @@ public JavaPairRDD call(JavaPairRDD rdd1, JavaP ); } + @SuppressWarnings("unchecked") @Test public void testStreamingContextTransform(){ List> stream1input = Arrays.asList( @@ -658,6 +680,7 @@ public Tuple2 call(Integer i) throws Exception { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testFlatMap() { List> inputData = Arrays.asList( @@ -683,6 +706,7 @@ public Iterable call(String x) { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairFlatMap() { List> inputData = Arrays.asList( @@ -718,22 +742,24 @@ public void testPairFlatMap() { new Tuple2(9, "s"))); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction() { - @Override - public Iterable> call(String in) throws Exception { - List> out = Lists.newArrayList(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2(in.length(), letter)); - } - return out; - } - }); + JavaPairDStream flatMapped = stream.flatMap( + new PairFlatMapFunction() { + @Override + public Iterable> call(String in) throws Exception { + List> out = Lists.newArrayList(); + for (String letter: in.split("(?!^)")) { + out.add(new Tuple2(in.length(), letter)); + } + return out; + } + }); JavaTestUtils.attachTestOutputStream(flatMapped); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testUnion() { List> inputData1 = Arrays.asList( @@ -778,6 +804,7 @@ public static > void assertOrderInvariantEquals( // PairDStream Functions + @SuppressWarnings("unchecked") @Test public void testPairFilter() { List> inputData = Arrays.asList( @@ -810,7 +837,8 @@ public Boolean call(Tuple2 in) throws Exception { Assert.assertEquals(expected, result); } - List>> stringStringKVStream = Arrays.asList( + @SuppressWarnings("unchecked") + private List>> stringStringKVStream = Arrays.asList( Arrays.asList(new Tuple2("california", "dodgers"), new Tuple2("california", "giants"), new Tuple2("new york", "yankees"), @@ -820,7 +848,8 @@ public Boolean call(Tuple2 in) throws Exception { new Tuple2("new york", "rangers"), new Tuple2("new york", "islanders"))); - List>> stringIntKVStream = Arrays.asList( + @SuppressWarnings("unchecked") + private List>> stringIntKVStream = Arrays.asList( Arrays.asList( new Tuple2("california", 1), new Tuple2("california", 3), @@ -832,6 +861,7 @@ public Boolean call(Tuple2 in) throws Exception { new Tuple2("new york", 3), new Tuple2("new york", 1))); + @SuppressWarnings("unchecked") @Test public void testPairMap() { // Maps pair -> pair of different type List>> inputData = stringIntKVStream; @@ -864,6 +894,7 @@ public Tuple2 call(Tuple2 in) throws Exception Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairMapPartitions() { // Maps pair -> pair of different type List>> inputData = stringIntKVStream; @@ -901,6 +932,7 @@ public Iterable> call(Iterator> Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairMap2() { // Maps pair -> single List>> inputData = stringIntKVStream; @@ -925,6 +957,7 @@ public Integer call(Tuple2 in) throws Exception { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair List>> inputData = Arrays.asList( @@ -967,6 +1000,7 @@ public Iterable> call(Tuple2 in) throws Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairGroupByKey() { List>> inputData = stringStringKVStream; @@ -989,6 +1023,7 @@ public void testPairGroupByKey() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairReduceByKey() { List>> inputData = stringIntKVStream; @@ -1013,6 +1048,7 @@ public void testPairReduceByKey() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCombineByKey() { List>> inputData = stringIntKVStream; @@ -1043,6 +1079,7 @@ public Integer call(Integer i) throws Exception { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCountByValue() { List> inputData = Arrays.asList( @@ -1068,6 +1105,7 @@ public void testCountByValue() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testGroupByKeyAndWindow() { List>> inputData = stringIntKVStream; @@ -1113,6 +1151,7 @@ private Tuple2> convert(Tuple2> t return new Tuple2>(tuple._1(), new HashSet(tuple._2())); } + @SuppressWarnings("unchecked") @Test public void testReduceByKeyAndWindow() { List>> inputData = stringIntKVStream; @@ -1136,6 +1175,7 @@ public void testReduceByKeyAndWindow() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testUpdateStateByKey() { List>> inputData = stringIntKVStream; @@ -1171,6 +1211,7 @@ public Optional call(List values, Optional state) { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testReduceByKeyAndWindowWithInverse() { List>> inputData = stringIntKVStream; @@ -1194,6 +1235,7 @@ public void testReduceByKeyAndWindowWithInverse() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCountByValueAndWindow() { List> inputData = Arrays.asList( @@ -1227,6 +1269,7 @@ public void testCountByValueAndWindow() { Assert.assertEquals(expected, unorderedResult); } + @SuppressWarnings("unchecked") @Test public void testPairTransform() { List>> inputData = Arrays.asList( @@ -1271,6 +1314,7 @@ public JavaPairRDD call(JavaPairRDD in) thro Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairToNormalRDDTransform() { List>> inputData = Arrays.asList( @@ -1312,6 +1356,8 @@ public Integer call(Tuple2 in) { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") + @Test public void testMapValues() { List>> inputData = stringStringKVStream; @@ -1342,6 +1388,7 @@ public String call(String s) throws Exception { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testFlatMapValues() { List>> inputData = stringStringKVStream; @@ -1386,6 +1433,7 @@ public Iterable call(String in) { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCoGroup() { List>> stringStringKVStream1 = Arrays.asList( @@ -1429,6 +1477,7 @@ public void testCoGroup() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testJoin() { List>> stringStringKVStream1 = Arrays.asList( @@ -1472,6 +1521,7 @@ public void testJoin() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testLeftOuterJoin() { List>> stringStringKVStream1 = Arrays.asList( @@ -1503,6 +1553,7 @@ public void testLeftOuterJoin() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCheckpointMasterRecovery() throws InterruptedException { List> inputData = Arrays.asList( @@ -1541,7 +1592,8 @@ public Integer call(String s) throws Exception { } - /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + @SuppressWarnings("unchecked") @Test public void testCheckpointofIndividualStream() throws InterruptedException { List> inputData = Arrays.asList( @@ -1581,16 +1633,14 @@ public void testSocketTextStream() { @Test public void testSocketString() { class Converter extends Function> { - public Iterable call(InputStream in) { + public Iterable call(InputStream in) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); List out = new ArrayList(); - try { - while (true) { - String line = reader.readLine(); - if (line == null) { break; } - out.add(line); - } - } catch (IOException e) { } + while (true) { + String line = reader.readLine(); + if (line == null) { break; } + out.add(line); + } return out; } }