Skip to content

Commit d1966f3

Browse files
committed
[SPARK-3902] [SPARK-3590] Stabilize AsynRDDActions and add Java API
This PR adds a Java API for AsyncRDDActions and promotes the API from `Experimental` to stable. Author: Josh Rosen <[email protected]> Author: Josh Rosen <[email protected]> Closes #2760 from JoshRosen/async-rdd-actions-in-java and squashes the following commits: 0d45fbc [Josh Rosen] Whitespace fix. ad3ae53 [Josh Rosen] Merge remote-tracking branch 'origin/master' into async-rdd-actions-in-java c0153a5 [Josh Rosen] Remove unused variable. e8e2867 [Josh Rosen] Updates based on Marcelo's review feedback 7a1417f [Josh Rosen] Removed unnecessary java.util import. 6f8f6ac [Josh Rosen] Fix import ordering. ff28e49 [Josh Rosen] Add MiMa excludes and fix a scalastyle error. 346e46e [Josh Rosen] [SPARK-3902] Stabilize AsyncRDDActions; add Java API.
1 parent 7e63bb4 commit d1966f3

File tree

6 files changed

+246
-35
lines changed

6 files changed

+246
-35
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.java;
19+
20+
21+
import java.util.List;
22+
import java.util.concurrent.Future;
23+
24+
public interface JavaFutureAction<T> extends Future<T> {
25+
26+
/**
27+
* Returns the job IDs run by the underlying async operation.
28+
*
29+
* This returns the current snapshot of the job list. Certain operations may run multiple
30+
* jobs, so multiple calls to this method may return different lists.
31+
*/
32+
List<Integer> jobIds();
33+
}

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,21 @@
1717

1818
package org.apache.spark
1919

20-
import scala.concurrent._
21-
import scala.concurrent.duration.Duration
22-
import scala.util.Try
20+
import java.util.Collections
21+
import java.util.concurrent.TimeUnit
2322

24-
import org.apache.spark.annotation.Experimental
23+
import org.apache.spark.api.java.JavaFutureAction
2524
import org.apache.spark.rdd.RDD
2625
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}
2726

27+
import scala.concurrent._
28+
import scala.concurrent.duration.Duration
29+
import scala.util.{Failure, Try}
30+
2831
/**
29-
* :: Experimental ::
3032
* A future for the result of an action to support cancellation. This is an extension of the
3133
* Scala Future interface to support cancellation.
3234
*/
33-
@Experimental
3435
trait FutureAction[T] extends Future[T] {
3536
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
3637
// documentation (with reference to the word "action").
@@ -69,6 +70,11 @@ trait FutureAction[T] extends Future[T] {
6970
*/
7071
override def isCompleted: Boolean
7172

73+
/**
74+
* Returns whether the action has been cancelled.
75+
*/
76+
def isCancelled: Boolean
77+
7278
/**
7379
* The value of this Future.
7480
*
@@ -96,15 +102,16 @@ trait FutureAction[T] extends Future[T] {
96102

97103

98104
/**
99-
* :: Experimental ::
100105
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
101106
* count, collect, reduce.
102107
*/
103-
@Experimental
104108
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
105109
extends FutureAction[T] {
106110

111+
@volatile private var _cancelled: Boolean = false
112+
107113
override def cancel() {
114+
_cancelled = true
108115
jobWaiter.cancel()
109116
}
110117

@@ -143,6 +150,8 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
143150
}
144151

145152
override def isCompleted: Boolean = jobWaiter.jobFinished
153+
154+
override def isCancelled: Boolean = _cancelled
146155

147156
override def value: Option[Try[T]] = {
148157
if (jobWaiter.jobFinished) {
@@ -164,12 +173,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
164173

165174

166175
/**
167-
* :: Experimental ::
168176
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
169177
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
170178
* action thread if it is being blocked by a job.
171179
*/
172-
@Experimental
173180
class ComplexFutureAction[T] extends FutureAction[T] {
174181

175182
// Pointer to the thread that is executing the action. It is set when the action is run.
@@ -222,7 +229,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
222229
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob
223230
// command need to be in an atomic block.
224231
val job = this.synchronized {
225-
if (!cancelled) {
232+
if (!isCancelled) {
226233
rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc)
227234
} else {
228235
throw new SparkException("Action has been cancelled")
@@ -243,10 +250,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
243250
}
244251
}
245252

246-
/**
247-
* Returns whether the promise has been cancelled.
248-
*/
249-
def cancelled: Boolean = _cancelled
253+
override def isCancelled: Boolean = _cancelled
250254

251255
@throws(classOf[InterruptedException])
252256
@throws(classOf[scala.concurrent.TimeoutException])
@@ -271,3 +275,55 @@ class ComplexFutureAction[T] extends FutureAction[T] {
271275
def jobIds = jobs
272276

273277
}
278+
279+
private[spark]
280+
class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T)
281+
extends JavaFutureAction[T] {
282+
283+
import scala.collection.JavaConverters._
284+
285+
override def isCancelled: Boolean = futureAction.isCancelled
286+
287+
override def isDone: Boolean = {
288+
// According to java.util.Future's Javadoc, this returns True if the task was completed,
289+
// whether that completion was due to successful execution, an exception, or a cancellation.
290+
futureAction.isCancelled || futureAction.isCompleted
291+
}
292+
293+
override def jobIds(): java.util.List[java.lang.Integer] = {
294+
Collections.unmodifiableList(futureAction.jobIds.map(Integer.valueOf).asJava)
295+
}
296+
297+
private def getImpl(timeout: Duration): T = {
298+
// This will throw TimeoutException on timeout:
299+
Await.ready(futureAction, timeout)
300+
futureAction.value.get match {
301+
case scala.util.Success(value) => converter(value)
302+
case Failure(exception) =>
303+
if (isCancelled) {
304+
throw new CancellationException("Job cancelled").initCause(exception)
305+
} else {
306+
// java.util.Future.get() wraps exceptions in ExecutionException
307+
throw new ExecutionException("Exception thrown by job", exception)
308+
}
309+
}
310+
}
311+
312+
override def get(): T = getImpl(Duration.Inf)
313+
314+
override def get(timeout: Long, unit: TimeUnit): T =
315+
getImpl(Duration.fromNanos(unit.toNanos(timeout)))
316+
317+
override def cancel(mayInterruptIfRunning: Boolean): Boolean = synchronized {
318+
if (isDone) {
319+
// According to java.util.Future's Javadoc, this should return false if the task is completed.
320+
false
321+
} else {
322+
// We're limited in terms of the semantics we can provide here; our cancellation is
323+
// asynchronous and doesn't provide a mechanism to not cancel if the job is running.
324+
futureAction.cancel()
325+
true
326+
}
327+
}
328+
329+
}

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ import java.util.{Comparator, List => JList, Iterator => JIterator}
2121
import java.lang.{Iterable => JIterable, Long => JLong}
2222

2323
import scala.collection.JavaConversions._
24+
import scala.collection.JavaConverters._
2425
import scala.reflect.ClassTag
2526

2627
import com.google.common.base.Optional
2728
import org.apache.hadoop.io.compress.CompressionCodec
2829

29-
import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
30+
import org.apache.spark._
31+
import org.apache.spark.SparkContext._
3032
import org.apache.spark.annotation.Experimental
3133
import org.apache.spark.api.java.JavaPairRDD._
3234
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
@@ -294,8 +296,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
294296
* Applies a function f to all elements of this RDD.
295297
*/
296298
def foreach(f: VoidFunction[T]) {
297-
val cleanF = rdd.context.clean((x: T) => f.call(x))
298-
rdd.foreach(cleanF)
299+
rdd.foreach(x => f.call(x))
299300
}
300301

301302
/**
@@ -576,16 +577,44 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
576577
def name(): String = rdd.name
577578

578579
/**
579-
* :: Experimental ::
580-
* The asynchronous version of the foreach action.
581-
*
582-
* @param f the function to apply to all the elements of the RDD
583-
* @return a FutureAction for the action
580+
* The asynchronous version of `count`, which returns a
581+
* future for counting the number of elements in this RDD.
584582
*/
585-
@Experimental
586-
def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
587-
import org.apache.spark.SparkContext._
588-
rdd.foreachAsync(x => f.call(x))
583+
def countAsync(): JavaFutureAction[JLong] = {
584+
new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), JLong.valueOf)
585+
}
586+
587+
/**
588+
* The asynchronous version of `collect`, which returns a future for
589+
* retrieving an array containing all of the elements in this RDD.
590+
*/
591+
def collectAsync(): JavaFutureAction[JList[T]] = {
592+
new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => x.asJava)
593+
}
594+
595+
/**
596+
* The asynchronous version of the `take` action, which returns a
597+
* future for retrieving the first `num` elements of this RDD.
598+
*/
599+
def takeAsync(num: Int): JavaFutureAction[JList[T]] = {
600+
new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => x.asJava)
589601
}
590602

603+
/**
604+
* The asynchronous version of the `foreach` action, which
605+
* applies a function f to all the elements of this RDD.
606+
*/
607+
def foreachAsync(f: VoidFunction[T]): JavaFutureAction[Void] = {
608+
new JavaFutureActionWrapper[Unit, Void](rdd.foreachAsync(x => f.call(x)),
609+
{ x => null.asInstanceOf[Void] })
610+
}
611+
612+
/**
613+
* The asynchronous version of the `foreachPartition` action, which
614+
* applies a function f to each partition of this RDD.
615+
*/
616+
def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = {
617+
new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x)),
618+
{ x => null.asInstanceOf[Void] })
619+
}
591620
}

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,11 @@ import scala.concurrent.ExecutionContext.Implicits.global
2424
import scala.reflect.ClassTag
2525

2626
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
27-
import org.apache.spark.annotation.Experimental
2827

2928
/**
30-
* :: Experimental ::
3129
* A set of asynchronous RDD actions available through an implicit conversion.
3230
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
3331
*/
34-
@Experimental
3532
class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {
3633

3734
/**

0 commit comments

Comments
 (0)