Skip to content

Commit 31dbe69

Browse files
committed
Add spark.rpc.* and deprecate spark.akka.*
1 parent 6fe690d commit 31dbe69

File tree

15 files changed

+83
-52
lines changed

15 files changed

+83
-52
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,15 @@ private[spark] object SparkConf extends Logging {
431431
"spark.yarn.am.waitTime" -> Seq(
432432
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
433433
// Translate old value to a duration, with 10s wait time per try.
434-
translation = s => s"${s.toLong * 10}s"))
434+
translation = s => s"${s.toLong * 10}s")),
435+
"spark.rpc.num.retries" -> Seq(
436+
AlternateConfig("spark.akka.num.retries", "1.4")),
437+
"spark.rpc.retry.wait" -> Seq(
438+
AlternateConfig("spark.akka.retry.wait", "1.4")),
439+
"spark.rpc.askTimeout" -> Seq(
440+
AlternateConfig("spark.akka.askTimeout", "1.4")),
441+
"spark.rpc.lookupTimeout" -> Seq(
442+
AlternateConfig("spark.akka.lookupTimeout", "1.4"))
435443
)
436444

437445
/**

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.log4j.{Level, Logger}
2727
import org.apache.spark.{Logging, SecurityManager, SparkConf}
2828
import org.apache.spark.deploy.DeployMessages._
2929
import org.apache.spark.deploy.master.{DriverState, Master}
30-
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
30+
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils}
3131

3232
/**
3333
* Proxy that relays messages to the driver.
@@ -36,7 +36,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
3636
extends Actor with ActorLogReceive with Logging {
3737

3838
var masterActor: ActorSelection = _
39-
val timeout = AkkaUtils.askTimeout(conf)
39+
val timeout = RpcUtils.askTimeout(conf)
4040

4141
override def preStart(): Unit = {
4242
masterActor = context.actorSelection(
@@ -155,7 +155,7 @@ object Client {
155155
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
156156
conf.set("spark.akka.logLifecycleEvents", "true")
157157
}
158-
conf.set("spark.akka.askTimeout", "10")
158+
conf.set("spark.rpc.askTimeout", "10")
159159
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
160160
Logger.getRootLogger.setLevel(driverArgs.logLevel)
161161

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf}
3030
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
3131
import org.apache.spark.deploy.DeployMessages._
3232
import org.apache.spark.deploy.master.Master
33-
import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils}
33+
import org.apache.spark.util.{ActorLogReceive, RpcUtils, Utils, AkkaUtils}
3434

3535
/**
3636
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
@@ -193,7 +193,7 @@ private[spark] class AppClient(
193193
def stop() {
194194
if (actor != null) {
195195
try {
196-
val timeout = AkkaUtils.askTimeout(conf)
196+
val timeout = RpcUtils.askTimeout(conf)
197197
val future = actor.ask(StopAppClient)(timeout)
198198
Await.result(future, timeout)
199199
} catch {

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import org.apache.spark.deploy.rest.StandaloneRestServer
4747
import org.apache.spark.metrics.MetricsSystem
4848
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
4949
import org.apache.spark.ui.SparkUI
50-
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
50+
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, SignalLogger, Utils}
5151

5252
private[master] class Master(
5353
host: String,
@@ -931,7 +931,7 @@ private[deploy] object Master extends Logging {
931931
securityManager = securityMgr)
932932
val actor = actorSystem.actorOf(
933933
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
934-
val timeout = AkkaUtils.askTimeout(conf)
934+
val timeout = RpcUtils.askTimeout(conf)
935935
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
936936
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
937937
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.Logging
2121
import org.apache.spark.deploy.master.Master
2222
import org.apache.spark.ui.{SparkUI, WebUI}
2323
import org.apache.spark.ui.JettyUtils._
24-
import org.apache.spark.util.AkkaUtils
24+
import org.apache.spark.util.RpcUtils
2525

2626
/**
2727
* Web UI server for the standalone master.
@@ -31,7 +31,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
3131
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {
3232

3333
val masterActorRef = master.self
34-
val timeout = AkkaUtils.askTimeout(master.conf)
34+
val timeout = RpcUtils.askTimeout(master.conf)
3535
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
3636

3737
initialize()

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.json4s._
3232
import org.json4s.jackson.JsonMethods._
3333

3434
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
35-
import org.apache.spark.util.{AkkaUtils, Utils}
35+
import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
3636
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
3737
import org.apache.spark.deploy.ClientArguments._
3838

@@ -223,7 +223,7 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
223223
}
224224

225225
protected def handleKill(submissionId: String): KillSubmissionResponse = {
226-
val askTimeout = AkkaUtils.askTimeout(conf)
226+
val askTimeout = RpcUtils.askTimeout(conf)
227227
val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
228228
DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout)
229229
val k = new KillSubmissionResponse
@@ -257,7 +257,7 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
257257
}
258258

259259
protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
260-
val askTimeout = AkkaUtils.askTimeout(conf)
260+
val askTimeout = RpcUtils.askTimeout(conf)
261261
val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
262262
DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout)
263263
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
@@ -321,7 +321,7 @@ private[rest] class SubmitRequestServlet(
321321
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
322322
requestMessage match {
323323
case submitRequest: CreateSubmissionRequest =>
324-
val askTimeout = AkkaUtils.askTimeout(conf)
324+
val askTimeout = RpcUtils.askTimeout(conf)
325325
val driverDescription = buildDriverDescription(submitRequest)
326326
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
327327
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.deploy.worker.Worker
2525
import org.apache.spark.deploy.worker.ui.WorkerWebUI._
2626
import org.apache.spark.ui.{SparkUI, WebUI}
2727
import org.apache.spark.ui.JettyUtils._
28-
import org.apache.spark.util.AkkaUtils
28+
import org.apache.spark.util.RpcUtils
2929

3030
/**
3131
* Web UI server for the standalone worker.
@@ -38,7 +38,7 @@ class WorkerWebUI(
3838
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
3939
with Logging {
4040

41-
private[ui] val timeout = AkkaUtils.askTimeout(worker.conf)
41+
private[ui] val timeout = RpcUtils.askTimeout(worker.conf)
4242

4343
initialize()
4444

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.language.postfixOps
2525
import scala.reflect.ClassTag
2626

2727
import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf}
28-
import org.apache.spark.util.{AkkaUtils, Utils}
28+
import org.apache.spark.util.{RpcUtils, Utils}
2929

3030
/**
3131
* An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
@@ -38,7 +38,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
3838
*/
3939
private[spark] abstract class RpcEnv(conf: SparkConf) {
4040

41-
private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
41+
private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf)
4242

4343
/**
4444
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
@@ -282,9 +282,9 @@ trait ThreadSafeRpcEndpoint extends RpcEndpoint
282282
private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
283283
extends Serializable with Logging {
284284

285-
private[this] val maxRetries = conf.getInt("spark.akka.num.retries", 3)
286-
private[this] val retryWaitMs = conf.getLong("spark.akka.retry.wait", 3000)
287-
private[this] val defaultAskTimeout = conf.getLong("spark.akka.askTimeout", 30) seconds
285+
private[this] val maxRetries = RpcUtils.numRetries(conf)
286+
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
287+
private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
288288

289289
/**
290290
* return the address for the [[RpcEndpointRef]]

core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.rpc._
2424
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
2525
import org.apache.spark.scheduler.TaskSchedulerImpl
2626
import org.apache.spark.ui.JettyUtils
27-
import org.apache.spark.util.{AkkaUtils, Utils}
27+
import org.apache.spark.util.{RpcUtils, Utils}
2828

2929
import scala.util.control.NonFatal
3030

@@ -46,7 +46,7 @@ private[spark] abstract class YarnSchedulerBackend(
4646
private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
4747
YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))
4848

49-
private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf)
49+
private implicit val askTimeout = RpcUtils.askTimeout(sc.conf)
5050

5151
/**
5252
* Request executors from the ApplicationMaster by specifying the total number desired.

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
2323
import org.apache.spark.rpc.RpcEndpointRef
2424
import org.apache.spark.{Logging, SparkConf, SparkException}
2525
import org.apache.spark.storage.BlockManagerMessages._
26-
import org.apache.spark.util.AkkaUtils
26+
import org.apache.spark.util.RpcUtils
2727

2828
private[spark]
2929
class BlockManagerMaster(
@@ -32,7 +32,7 @@ class BlockManagerMaster(
3232
isDriver: Boolean)
3333
extends Logging {
3434

35-
val timeout = AkkaUtils.askTimeout(conf)
35+
val timeout = RpcUtils.askTimeout(conf)
3636

3737
/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
3838
def removeExecutor(execId: String) {

0 commit comments

Comments
 (0)