Skip to content

Commit fadaf6f

Browse files
committed
[SPARK-6980] Put back in deprecated RpcUtils askTimeout and lookupTimout to fix MiMa errors
1 parent fa6ed82 commit fadaf6f

File tree

11 files changed

+27
-15
lines changed

11 files changed

+27
-15
lines changed

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
4545
private val lostMasters = new HashSet[Address]
4646
private var activeMasterActor: ActorSelection = null
4747

48-
val timeout = RpcUtils.askTimeout(conf)
48+
val timeout = RpcUtils.askRpcTimeout(conf)
4949

5050
override def preStart(): Unit = {
5151
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ private[spark] class AppClient(
193193
def stop() {
194194
if (actor != null) {
195195
try {
196-
val timeout = RpcUtils.askTimeout(conf)
196+
val timeout = RpcUtils.askRpcTimeout(conf)
197197
val future = actor.ask(StopAppClient)(timeout.duration)
198198
timeout.awaitResult(future)
199199
} catch {

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,7 @@ private[deploy] object Master extends Logging {
938938
securityManager = securityMgr)
939939
val actor = actorSystem.actorOf(
940940
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
941-
val timeout = RpcUtils.askTimeout(conf)
941+
val timeout = RpcUtils.askRpcTimeout(conf)
942942
val portsRequest = actor.ask(BoundPortsRequest)(timeout.duration)
943943
val portsResponse = timeout.awaitResult(portsRequest).asInstanceOf[BoundPortsResponse]
944944
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
3434
with UIRoot {
3535

3636
val masterActorRef = master.self
37-
val timeout = RpcUtils.askTimeout(master.conf)
37+
val timeout = RpcUtils.askRpcTimeout(master.conf)
3838
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
3939

4040
val masterPage = new MasterPage(this)

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private[rest] class StandaloneKillRequestServlet(masterActor: ActorRef, conf: Sp
7171
extends KillRequestServlet {
7272

7373
protected def handleKill(submissionId: String): KillSubmissionResponse = {
74-
val askTimeout = RpcUtils.askTimeout(conf)
74+
val askTimeout = RpcUtils.askRpcTimeout(conf)
7575
val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
7676
DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout)
7777
val k = new KillSubmissionResponse
@@ -90,7 +90,7 @@ private[rest] class StandaloneStatusRequestServlet(masterActor: ActorRef, conf:
9090
extends StatusRequestServlet {
9191

9292
protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
93-
val askTimeout = RpcUtils.askTimeout(conf)
93+
val askTimeout = RpcUtils.askRpcTimeout(conf)
9494
val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
9595
DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout)
9696
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
@@ -175,7 +175,7 @@ private[rest] class StandaloneSubmitRequestServlet(
175175
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
176176
requestMessage match {
177177
case submitRequest: CreateSubmissionRequest =>
178-
val askTimeout = RpcUtils.askTimeout(conf)
178+
val askTimeout = RpcUtils.askRpcTimeout(conf)
179179
val driverDescription = buildDriverDescription(submitRequest)
180180
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
181181
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class WorkerWebUI(
3838
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
3939
with Logging {
4040

41-
private[ui] val timeout = RpcUtils.askTimeout(worker.conf)
41+
private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)
4242

4343
initialize()
4444

core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
3131

3232
private[this] val maxRetries = RpcUtils.numRetries(conf)
3333
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
34-
private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
34+
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
3535

3636
/**
3737
* return the address for the [[RpcEndpointRef]]
@@ -118,4 +118,5 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
118118
throw new SparkException(
119119
s"Error sending message [message = $message]", lastException)
120120
}
121+
121122
}

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private[spark] object RpcEnv {
6868
*/
6969
private[spark] abstract class RpcEnv(conf: SparkConf) {
7070

71-
private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf)
71+
private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
7272

7373
/**
7474
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class BlockManagerMaster(
3333
isDriver: Boolean)
3434
extends Logging {
3535

36-
val timeout = RpcUtils.askTimeout(conf)
36+
val timeout = RpcUtils.askRpcTimeout(conf)
3737

3838
/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
3939
def removeExecutor(execId: String) {

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ private[spark] object AkkaUtils extends Logging {
197197
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
198198
Utils.checkHost(driverHost, "Expected hostname")
199199
val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
200-
val timeout = RpcUtils.lookupTimeout(conf)
200+
val timeout = RpcUtils.lookupRpcTimeout(conf)
201201
logInfo(s"Connecting to $name: $url")
202202
timeout.awaitResult(actorSystem.actorSelection(url).resolveOne(timeout.duration))
203203
}
@@ -211,7 +211,7 @@ private[spark] object AkkaUtils extends Logging {
211211
val executorActorSystemName = SparkEnv.executorActorSystemName
212212
Utils.checkHost(host, "Expected hostname")
213213
val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
214-
val timeout = RpcUtils.lookupTimeout(conf)
214+
val timeout = RpcUtils.lookupRpcTimeout(conf)
215215
logInfo(s"Connecting to $name: $url")
216216
timeout.awaitResult(actorSystem.actorSelection(url).resolveOne(timeout.duration))
217217
}

0 commit comments

Comments
 (0)