Skip to content

Commit 97523e0

Browse files
committed
[SPARK-6980] Akka ask timeout description refactored to RPC layer
1 parent 97dee31 commit 97523e0

File tree

12 files changed

+126
-51
lines changed

12 files changed

+126
-51
lines changed

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
102102
println("... waiting before polling master for driver state")
103103
Thread.sleep(5000)
104104
println("... polling master for driver state")
105-
val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout)
105+
val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout.duration)
106106
.mapTo[DriverStatusResponse]
107-
val statusResponse = Await.result(statusFuture, timeout)
107+
val statusResponse = timeout.awaitResult(statusFuture)
108108
statusResponse.found match {
109109
case false =>
110110
println(s"ERROR: Cluster master did not recognize $driverId")

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,9 @@ private[spark] class AppClient(
194194
if (actor != null) {
195195
try {
196196
val timeout = RpcUtils.askTimeout(conf)
197-
val future = actor.ask(StopAppClient)(timeout)
198-
Await.result(future, timeout)
197+
val future = actor.ask(StopAppClient)(timeout.duration)
198+
// TODO(bryanc) - RpcTimeout use awaitResult ???
199+
Await.result(future, timeout.duration)
199200
} catch {
200201
case e: TimeoutException =>
201202
logInfo("Stop request to Master timed out; it may already be shut down.")

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import java.text.SimpleDateFormat
2323
import java.util.Date
2424

2525
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
26-
import scala.concurrent.Await
2726
import scala.concurrent.duration._
2827
import scala.language.postfixOps
2928
import scala.util.Random
@@ -940,8 +939,8 @@ private[deploy] object Master extends Logging {
940939
val actor = actorSystem.actorOf(
941940
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
942941
val timeout = RpcUtils.askTimeout(conf)
943-
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
944-
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
942+
val portsRequest = actor.ask(BoundPortsRequest)(timeout.duration)
943+
val portsResponse = timeout.awaitResult(portsRequest).asInstanceOf[BoundPortsResponse]
945944
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
946945
}
947946
}

core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.deploy.master.ui
1919

2020
import javax.servlet.http.HttpServletRequest
2121

22-
import scala.concurrent.Await
2322
import scala.xml.Node
2423

2524
import akka.pattern.ask
@@ -38,8 +37,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
3837
/** Executor details for a particular application */
3938
def render(request: HttpServletRequest): Seq[Node] = {
4039
val appId = request.getParameter("appId")
41-
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
42-
val state = Await.result(stateFuture, timeout)
40+
val stateFuture = (master ? RequestMasterState)(timeout.duration).mapTo[MasterStateResponse]
41+
val state = timeout.awaitResult(stateFuture)
4342
val app = state.activeApps.find(_.id == appId).getOrElse({
4443
state.completedApps.find(_.id == appId).getOrElse(null)
4544
})

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.deploy.master.ui
1919

2020
import javax.servlet.http.HttpServletRequest
2121

22-
import scala.concurrent.Await
2322
import scala.xml.Node
2423

2524
import akka.pattern.ask
@@ -36,8 +35,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
3635
private val timeout = parent.timeout
3736

3837
def getMasterState: MasterStateResponse = {
39-
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
40-
Await.result(stateFuture, timeout)
38+
val stateFuture = (master ? RequestMasterState)(timeout.duration).mapTo[MasterStateResponse]
39+
timeout.awaitResult(stateFuture)
4140
}
4241

4342
override def renderJson(request: HttpServletRequest): JValue = {

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.deploy.worker.ui
1919

20-
import scala.concurrent.Await
2120
import scala.xml.Node
2221

2322
import akka.pattern.ask
@@ -36,14 +35,14 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
3635
private val timeout = parent.timeout
3736

3837
override def renderJson(request: HttpServletRequest): JValue = {
39-
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
40-
val workerState = Await.result(stateFuture, timeout)
38+
val stateFuture = (workerActor ? RequestWorkerState)(timeout.duration).mapTo[WorkerStateResponse]
39+
val workerState = timeout.awaitResult(stateFuture)
4140
JsonProtocol.writeWorkerState(workerState)
4241
}
4342

4443
def render(request: HttpServletRequest): Seq[Node] = {
45-
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
46-
val workerState = Await.result(stateFuture, timeout)
44+
val stateFuture = (workerActor ? RequestWorkerState)(timeout.duration).mapTo[WorkerStateResponse]
45+
val workerState = timeout.awaitResult(stateFuture)
4746

4847
val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs")
4948
val runningExecutors = workerState.executors

core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.spark.rpc
1919

20-
import scala.concurrent.{Await, Future}
21-
import scala.concurrent.duration.FiniteDuration
20+
import scala.concurrent.Future
2221
import scala.reflect.ClassTag
2322

2423
import org.apache.spark.util.RpcUtils
@@ -52,7 +51,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
5251
*
5352
* This method only sends the message once and never retries.
5453
*/
55-
def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T]
54+
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
5655

5756
/**
5857
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
@@ -91,15 +90,15 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
9190
* @tparam T type of the reply message
9291
* @return the reply message from the corresponding [[RpcEndpoint]]
9392
*/
94-
def askWithRetry[T: ClassTag](message: Any, timeout: FiniteDuration): T = {
93+
def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
9594
// TODO: Consider removing multiple attempts
9695
var attempts = 0
9796
var lastException: Exception = null
9897
while (attempts < maxRetries) {
9998
attempts += 1
10099
try {
101100
val future = ask[T](message, timeout)
102-
val result = Await.result(future, timeout)
101+
val result = timeout.awaitResult(future)
103102
if (result == null) {
104103
throw new SparkException("Actor returned null")
105104
}
@@ -110,7 +109,10 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
110109
lastException = e
111110
logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
112111
}
113-
Thread.sleep(retryWaitMs)
112+
113+
if (attempts < maxRetries) {
114+
Thread.sleep(retryWaitMs)
115+
}
114116
}
115117

116118
throw new SparkException(

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
package org.apache.spark.rpc
1919

2020
import java.net.URI
21+
import java.util.concurrent.TimeoutException
2122

23+
import scala.concurrent.duration.FiniteDuration
24+
import scala.concurrent.duration._
2225
import scala.concurrent.{Await, Future}
2326
import scala.language.postfixOps
2427

@@ -94,7 +97,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
9497
* Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.
9598
*/
9699
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
97-
Await.result(asyncSetupEndpointRefByURI(uri), defaultLookupTimeout)
100+
Await.result(asyncSetupEndpointRefByURI(uri), defaultLookupTimeout.duration)
98101
}
99102

100103
/**
@@ -182,3 +185,68 @@ private[spark] object RpcAddress {
182185
RpcAddress(host, port)
183186
}
184187
}
188+
189+
190+
/**
191+
* Associates a timeout with a configuration property so that a TimeoutException can be
192+
* traced back to the controlling property.
193+
* @param timeout timeout duration in seconds
194+
* @param description description to be displayed in a timeout exception
195+
*/
196+
private[spark] class RpcTimeout(timeout: FiniteDuration, description: String) {
197+
198+
/** Get the timeout duration */
199+
def duration: FiniteDuration = timeout
200+
201+
/** Get the message associated with this timeout */
202+
def message: String = description
203+
204+
/** Amends the standard message of TimeoutException to include the description */
205+
def amend(te: TimeoutException): TimeoutException = {
206+
new TimeoutException(te.getMessage() + " " + description)
207+
}
208+
209+
/** Wait on a future result to catch and amend a TimeoutException */
210+
def awaitResult[T](future: Future[T]): T = {
211+
try {
212+
Await.result(future, duration)
213+
}
214+
catch {
215+
case te: TimeoutException =>
216+
throw amend(te)
217+
}
218+
}
219+
220+
// TODO(bryanc) wrap Await.ready also
221+
}
222+
223+
object RpcTimeout {
224+
225+
private[this] val messagePrefix = "This timeout is controlled by "
226+
227+
/**
228+
* Lookup the timeout property in the configuration and create
229+
* a RpcTimeout with the property key in the description.
230+
* @param conf configuration properties containing the timeout
231+
* @param timeoutProp property key for the timeout in seconds
232+
* @throws NoSuchElementException if property is not set
233+
*/
234+
def apply(conf: SparkConf, timeoutProp: String): RpcTimeout = {
235+
val timeout = { conf.getTimeAsSeconds(timeoutProp) seconds }
236+
new RpcTimeout(timeout, messagePrefix + timeoutProp)
237+
}
238+
239+
/**
240+
* Lookup the timeout property in the configuration and create
241+
* a RpcTimeout with the property key in the description.
242+
* Uses the given default value if property is not set
243+
* @param conf configuration properties containing the timeout
244+
* @param timeoutProp property key for the timeout in seconds
245+
* @param defaultValue default timeout value in seconds if property not found
246+
*/
247+
def apply(conf: SparkConf, timeoutProp: String, defaultValue: String): RpcTimeout = {
248+
val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds }
249+
new RpcTimeout(timeout, messagePrefix + timeoutProp)
250+
}
251+
252+
}

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.rpc.akka
2020
import java.util.concurrent.ConcurrentHashMap
2121

2222
import scala.concurrent.Future
23-
import scala.concurrent.duration._
2423
import scala.language.postfixOps
2524
import scala.reflect.ClassTag
2625
import scala.util.control.NonFatal
@@ -212,7 +211,7 @@ private[spark] class AkkaRpcEnv private[akka] (
212211

213212
override def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
214213
import actorSystem.dispatcher
215-
actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout).
214+
actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout.duration).
216215
map(new AkkaRpcEndpointRef(defaultAddress, _, conf))
217216
}
218217

@@ -293,9 +292,9 @@ private[akka] class AkkaRpcEndpointRef(
293292
actorRef ! AkkaMessage(message, false)
294293
}
295294

296-
override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = {
295+
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
297296
import scala.concurrent.ExecutionContext.Implicits.global
298-
actorRef.ask(AkkaMessage(message, true))(timeout).flatMap {
297+
actorRef.ask(AkkaMessage(message, true))(timeout.duration).flatMap {
299298
case msg @ AkkaMessage(message, reply) =>
300299
if (reply) {
301300
logError(s"Receive $msg but the sender cannot reply")

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.storage
1919

20-
import scala.concurrent.{Await, Future}
20+
import scala.concurrent.Future
2121
import scala.concurrent.ExecutionContext.Implicits.global
2222

2323
import org.apache.spark.rpc.RpcEndpointRef
@@ -105,7 +105,7 @@ class BlockManagerMaster(
105105
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
106106
}
107107
if (blocking) {
108-
Await.result(future, timeout)
108+
timeout.awaitResult(future)
109109
}
110110
}
111111

@@ -117,7 +117,7 @@ class BlockManagerMaster(
117117
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
118118
}
119119
if (blocking) {
120-
Await.result(future, timeout)
120+
timeout.awaitResult(future)
121121
}
122122
}
123123

@@ -131,7 +131,7 @@ class BlockManagerMaster(
131131
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
132132
}
133133
if (blocking) {
134-
Await.result(future, timeout)
134+
timeout.awaitResult(future)
135135
}
136136
}
137137

@@ -169,7 +169,7 @@ class BlockManagerMaster(
169169
val response = driverEndpoint.
170170
askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
171171
val (blockManagerIds, futures) = response.unzip
172-
val result = Await.result(Future.sequence(futures), timeout)
172+
val result = timeout.awaitResult(Future.sequence(futures))
173173
if (result == null) {
174174
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
175175
}
@@ -192,7 +192,7 @@ class BlockManagerMaster(
192192
askSlaves: Boolean): Seq[BlockId] = {
193193
val msg = GetMatchingBlockIds(filter, askSlaves)
194194
val future = driverEndpoint.askWithRetry[Future[Seq[BlockId]]](msg)
195-
Await.result(future, timeout)
195+
timeout.awaitResult(future)
196196
}
197197

198198
/** Stop the driver endpoint, called only on the Spark driver node */

0 commit comments

Comments
 (0)