Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 3a168c7

Browse files
committed
[SPARK-6980] Rewrote Akka RpcTimeout UTs in RpcEnvSuite
1 parent 7636189 commit 3a168c7

File tree

2 files changed

+60
-69
lines changed

2 files changed

+60
-69
lines changed

core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,66 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
575575
}
576576
}
577577

578+
test("ask a message timeout on Future using RpcTimeout") {
579+
case class SleepyReply(msg: String)
580+
581+
val rpcEndpointRef = env.setupEndpoint("ask-future", new RpcEndpoint {
582+
override val rpcEnv = env
583+
584+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
585+
case msg: String => {
586+
context.reply(msg)
587+
}
588+
case sr: SleepyReply => {
589+
Thread.sleep(50)
590+
context.reply(sr.msg)
591+
}
592+
}
593+
})
594+
595+
val longTimeout = new RpcTimeout(1 second, "spark.rpc.long.timeout")
596+
val shortTimeout = new RpcTimeout(10 millis, "spark.rpc.short.timeout")
597+
598+
// Ask with immediate response, should complete successfully
599+
val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout)
600+
val reply1 = longTimeout.awaitResult(fut1)
601+
assert("hello" === reply1)
602+
603+
// Ask with a delayed response and wait for response immediately that should timeout
604+
val fut2 = rpcEndpointRef.ask[String](SleepyReply("doh"), shortTimeout)
605+
val reply2 =
606+
intercept[RpcTimeoutException] {
607+
shortTimeout.awaitResult(fut2)
608+
}.getMessage
609+
610+
// RpcTimeout.awaitResult should have added the property to the TimeoutException message
611+
assert(reply2.contains(shortTimeout.timeoutProp))
612+
613+
// Ask with delayed response and allow the Future to timeout before Await.result
614+
val fut3 = rpcEndpointRef.ask[String](SleepyReply("goodbye"),shortTimeout)
615+
616+
// Allow future to complete with failure using plain Await.result, this will return
617+
// once the future is complete to verify addMessageIfTimeout was invoked
618+
val reply3 =
619+
intercept[RpcTimeoutException] {
620+
Await.result(fut3, 200 millis)
621+
}.getMessage
622+
623+
// When the future timed out, the recover callback should have used
624+
// RpcTimeout.addMessageIfTimeout to add the property to the TimeoutException message
625+
assert(reply3.contains(shortTimeout.timeoutProp))
626+
627+
// Use RpcTimeout.awaitResult to process Future, since it has already failed with
628+
// RpcTimeoutException, the same RpcTimeoutException should be thrown
629+
val reply4 =
630+
intercept[RpcTimeoutException] {
631+
shortTimeout.awaitResult(fut3)
632+
}.getMessage
633+
634+
// Ensure description is not in message twice after addMessageIfTimeout and awaitResult
635+
assert(shortTimeout.timeoutProp.r.findAllIn(reply4).length === 1)
636+
}
637+
578638
}
579639

580640
class UnserializableClass

core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -56,73 +56,4 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
5656
}
5757
}
5858

59-
test("timeout on ask Future with RpcTimeout") {
60-
61-
class EchoActor(sleepDuration: Long) extends Actor {
62-
def receive: Receive = {
63-
case msg =>
64-
Thread.sleep(sleepDuration)
65-
sender() ! msg
66-
}
67-
}
68-
69-
val akkaConf = ConfigFactory.empty().withValue("akka.log-dead-letters",
70-
ConfigValueFactory.fromAnyRef("off"))
71-
val system = ActorSystem("EchoSystem", akkaConf)
72-
val echoActor = system.actorOf(Props(new EchoActor(0)), name = "echo")
73-
val sleepyActor = system.actorOf(Props(new EchoActor(50)), name = "sleepy")
74-
75-
val longProp = "spark.rpc.long.timeout"
76-
val longTimeout = new RpcTimeout(1 second, longProp)
77-
val shortProp = "spark.rpc.short.timeout"
78-
val shortTimeout = new RpcTimeout(10 millis, shortProp)
79-
80-
try {
81-
82-
// Ask with immediate response
83-
var fut = echoActor.ask("hello")(longTimeout.duration).mapTo[String].
84-
recover(longTimeout.addMessageIfTimeout)
85-
86-
// This should complete successfully
87-
val result = longTimeout.awaitResult(fut)
88-
89-
assert(result.nonEmpty)
90-
91-
// Ask with a delayed response and wait for response immediately that should timeout
92-
fut = sleepyActor.ask("doh")(shortTimeout.duration).mapTo[String]
93-
val msg1 =
94-
intercept[RpcTimeoutException] {
95-
shortTimeout.awaitResult(fut)
96-
}.getMessage()
97-
98-
assert(msg1.contains(shortProp))
99-
100-
// Ask with delayed response using addMessageIfTimeout in recover callback
101-
fut = sleepyActor.ask("goodbye")(shortTimeout.duration).mapTo[String].
102-
recover(shortTimeout.addMessageIfTimeout)
103-
104-
// Allow future to complete with failure using plain Await.result, this will return
105-
// once the future is complete to verify addMessageIfTimeout was invoked
106-
val msg2 =
107-
intercept[RpcTimeoutException] {
108-
Await.result(fut, 200 millis)
109-
}.getMessage()
110-
111-
assert(msg2.contains(shortProp))
112-
113-
// Use RpcTimeout.awaitResult to process Future, since it has already failed with
114-
// RpcTimeoutException, the same exception should be thrown
115-
val msg3 =
116-
intercept[RpcTimeoutException] {
117-
shortTimeout.awaitResult(fut)
118-
}.getMessage()
119-
120-
// Ensure description is not in message twice after addMessageIfTimeout and awaitResult
121-
assert(shortProp.r.findAllIn(msg3).length === 1)
122-
123-
} finally {
124-
system.shutdown()
125-
}
126-
}
127-
12859
}

0 commit comments

Comments
 (0)