-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-6980] [CORE] Akka timeout exceptions indicate which conf controls them (RPC Layer) #6205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
97523e0
78a2c0a
5b59a44
49f9f04
23d2f26
a294569
f74064d
0ee5642
4be3a8d
b7fb99f
c07d05c
235919b
2f94095
1607a5f
4351c48
7774d56
995d196
d3754d1
08f5afc
1b9beab
2206b4d
1517721
1394de6
c6cfd33
b05d449
fa6ed82
fadaf6f
218aa50
039afed
be11c4e
7636189
3a168c7
3d8b1ff
287059a
7f4d78e
6a1c50d
4e89c75
dbd5f73
7bb70f1
06afa53
46c8d48
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,10 @@ | |
package org.apache.spark.rpc | ||
|
||
import java.net.URI | ||
import java.util.concurrent.TimeoutException | ||
|
||
import scala.concurrent.{Await, Future} | ||
import scala.concurrent.{Awaitable, Await, Future} | ||
import scala.concurrent.duration._ | ||
import scala.language.postfixOps | ||
|
||
import org.apache.spark.{SecurityManager, SparkConf} | ||
|
@@ -66,7 +68,7 @@ private[spark] object RpcEnv { | |
*/ | ||
private[spark] abstract class RpcEnv(conf: SparkConf) { | ||
|
||
private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf) | ||
private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf) | ||
|
||
/** | ||
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement | ||
|
@@ -94,7 +96,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) { | |
* Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action. | ||
*/ | ||
def setupEndpointRefByURI(uri: String): RpcEndpointRef = { | ||
Await.result(asyncSetupEndpointRefByURI(uri), defaultLookupTimeout) | ||
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri)) | ||
} | ||
|
||
/** | ||
|
@@ -184,3 +186,107 @@ private[spark] object RpcAddress { | |
RpcAddress(host, port) | ||
} | ||
} | ||
|
||
|
||
/** | ||
* An exception thrown if RpcTimeout modifies a [[TimeoutException]]. | ||
*/ | ||
private[rpc] class RpcTimeoutException(message: String, cause: TimeoutException) | ||
extends TimeoutException(message) { initCause(cause) } | ||
|
||
|
||
/** | ||
* Associates a timeout with a description so that a when a TimeoutException occurs, additional | ||
* context about the timeout can be amended to the exception message. | ||
* @param duration timeout duration in seconds | ||
* @param timeoutProp the configuration property that controls this timeout | ||
*/ | ||
private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: String) | ||
extends Serializable { | ||
|
||
/** Amends the standard message of TimeoutException to include the description */ | ||
private def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException = { | ||
new RpcTimeoutException(te.getMessage() + ". This timeout is controlled by " + timeoutProp, te) | ||
} | ||
|
||
/** | ||
* PartialFunction to match a TimeoutException and add the timeout description to the message | ||
* | ||
* @note This can be used in the recover callback of a Future to add to a TimeoutException | ||
* Example: | ||
* val timeout = new RpcTimeout(5 millis, "short timeout") | ||
* Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout) | ||
*/ | ||
def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = { | ||
// The exception has already been converted to a RpcTimeoutException so just raise it | ||
case rte: RpcTimeoutException => throw rte | ||
// Any other TimeoutException get converted to a RpcTimeoutException with modified message | ||
case te: TimeoutException => throw createRpcTimeoutException(te) | ||
} | ||
|
||
/** | ||
* Wait for the completed result and return it. If the result is not available within this | ||
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout. | ||
* @param awaitable the `Awaitable` to be awaited | ||
* @throws RpcTimeoutException if after waiting for the specified time `awaitable` | ||
* is still not ready | ||
*/ | ||
def awaitResult[T](awaitable: Awaitable[T]): T = { | ||
try { | ||
Await.result(awaitable, duration) | ||
} catch addMessageIfTimeout | ||
} | ||
} | ||
|
||
|
||
private[spark] object RpcTimeout { | ||
|
||
/** | ||
* Lookup the timeout property in the configuration and create | ||
* a RpcTimeout with the property key in the description. | ||
* @param conf configuration properties containing the timeout | ||
* @param timeoutProp property key for the timeout in seconds | ||
* @throws NoSuchElementException if property is not set | ||
*/ | ||
def apply(conf: SparkConf, timeoutProp: String): RpcTimeout = { | ||
val timeout = { conf.getTimeAsSeconds(timeoutProp) seconds } | ||
new RpcTimeout(timeout, timeoutProp) | ||
} | ||
|
||
/** | ||
* Lookup the timeout property in the configuration and create | ||
* a RpcTimeout with the property key in the description. | ||
* Uses the given default value if property is not set | ||
* @param conf configuration properties containing the timeout | ||
* @param timeoutProp property key for the timeout in seconds | ||
* @param defaultValue default timeout value in seconds if property not found | ||
*/ | ||
def apply(conf: SparkConf, timeoutProp: String, defaultValue: String): RpcTimeout = { | ||
val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as mentioned above, I don't think so -- lets stick w/ the current behavior of using |
||
new RpcTimeout(timeout, timeoutProp) | ||
} | ||
|
||
/** | ||
* Lookup prioritized list of timeout properties in the configuration | ||
* and create a RpcTimeout with the first set property key in the | ||
* description. | ||
* Uses the given default value if property is not set | ||
* @param conf configuration properties containing the timeout | ||
* @param timeoutPropList prioritized list of property keys for the timeout in seconds | ||
* @param defaultValue default timeout value in seconds if no properties found | ||
*/ | ||
def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue: String): RpcTimeout = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Creating |
||
require(timeoutPropList.nonEmpty) | ||
|
||
// Find the first set property or use the default value with the first property | ||
val itr = timeoutPropList.iterator | ||
var foundProp: Option[(String, String)] = None | ||
while (itr.hasNext && foundProp.isEmpty){ | ||
val propKey = itr.next() | ||
conf.getOption(propKey).foreach { prop => foundProp = Some(propKey, prop) } | ||
} | ||
val finalProp = foundProp.getOrElse(timeoutPropList.head, defaultValue) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never mind. Looks it's long too. |
||
val timeout = { Utils.timeStringAsSeconds(finalProp._2) seconds } | ||
new RpcTimeout(timeout, finalProp._1) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,6 @@ package org.apache.spark.rpc.akka | |
import java.util.concurrent.ConcurrentHashMap | ||
|
||
import scala.concurrent.Future | ||
import scala.concurrent.duration._ | ||
import scala.language.postfixOps | ||
import scala.reflect.ClassTag | ||
import scala.util.control.NonFatal | ||
|
@@ -214,8 +213,11 @@ private[spark] class AkkaRpcEnv private[akka] ( | |
|
||
override def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = { | ||
import actorSystem.dispatcher | ||
actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout). | ||
map(new AkkaRpcEndpointRef(defaultAddress, _, conf)) | ||
actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout.duration). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. though this doesn't follow the same pattern of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel there are two ways :
In my opinion 2nd one is the best solution as the previous one will require modifications where ever the usages are. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The timeout is a little trickier for Futures. From what I understand, creating the future is non-blocking, so we can't just call I think we might be able to use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what you are describing sounds reasonable. We certainly don't want to change the return type or await on the result here. You could potentially fix the doubly-appended msg by throwing a subclass of (Honestly I thiink it will also be fine to not stress too much about this case, it may not be worth it.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm going to do some research on this and see what our options are while still keeping the same return type. |
||
map(new AkkaRpcEndpointRef(defaultAddress, _, conf)). | ||
// this is just in case there is a timeout from creating the future in resolveOne, we want the | ||
// exception to indicate the conf that determines the timeout | ||
recover(defaultLookupTimeout.addMessageIfTimeout) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the usage with |
||
} | ||
|
||
override def uriOf(systemName: String, address: RpcAddress, endpointName: String): String = { | ||
|
@@ -295,8 +297,8 @@ private[akka] class AkkaRpcEndpointRef( | |
actorRef ! AkkaMessage(message, false) | ||
} | ||
|
||
override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = { | ||
actorRef.ask(AkkaMessage(message, true))(timeout).flatMap { | ||
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = { | ||
actorRef.ask(AkkaMessage(message, true))(timeout.duration).flatMap { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same thing here about catching the timeout |
||
// The function will run in the calling thread, so it should be short and never block. | ||
case msg @ AkkaMessage(message, reply) => | ||
if (reply) { | ||
|
@@ -307,7 +309,8 @@ private[akka] class AkkaRpcEndpointRef( | |
} | ||
case AkkaFailure(e) => | ||
Future.failed(e) | ||
}(ThreadUtils.sameThread).mapTo[T] | ||
}(ThreadUtils.sameThread).mapTo[T]. | ||
recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread) | ||
} | ||
|
||
override def toString: String = s"${getClass.getSimpleName}($actorRef)" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you don't need to import
FiniteDuration
explicitly since you're importingduration._
. Also you are supposed to order direct class imports before package imports (not just alphabetically), so it should be:the intellij import organizer will get this wrong, but aaron davidson wrote a plugin which does it right -- there are instructions for using it here: https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, using the plugin now