Skip to content

Commit 4351c48

Browse files
committed
[SPARK-6980] Added UT for addMessageIfTimeout, cleaned up UTs
1 parent 1607a5f commit 4351c48

File tree

2 files changed

+45
-49
lines changed

2 files changed

+45
-49
lines changed

core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -546,18 +546,26 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
546546
}
547547
}
548548

549-
test("construction of RpcTimeout using properties") {
549+
test("construct RpcTimeout with conf property") {
550550
val conf = new SparkConf
551551

552552
val testProp = "spark.ask.test.timeout"
553553
val testDurationSeconds = 30
554+
val secondaryProp = "spark.ask.secondary.timeout"
554555

555556
conf.set(testProp, testDurationSeconds.toString + "s")
557+
conf.set(secondaryProp, "100s")
556558

557-
val rt = RpcTimeout(conf, testProp)
558-
assert( testDurationSeconds === rt.duration.toSeconds )
559+
// Construct RpcTimeout with a single property
560+
val rt1 = RpcTimeout(conf, testProp)
561+
assert( testDurationSeconds === rt1.duration.toSeconds )
559562

560-
val ex = intercept[Throwable] {
563+
// Construct RpcTimeout with prioritized list of properties
564+
val rt2 = RpcTimeout(conf, Seq("spark.ask.invalid.timeout", testProp, secondaryProp), "1s")
565+
assert( testDurationSeconds === rt2.duration.toSeconds )
566+
567+
// Try to construct RpcTimeout with an unconfigured property
568+
intercept[Throwable] {
561569
RpcTimeout(conf, "spark.ask.invalid.timeout")
562570
}
563571
}

core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala

Lines changed: 33 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -59,72 +59,60 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
5959
}
6060
}
6161

62-
test("Future failure with RpcTimeout") {
62+
test("timeout on ask Future with RpcTimeout") {
6363

64-
class EchoActor extends Actor {
64+
class EchoActor(sleepDuration: Long) extends Actor {
6565
def receive: Receive = {
6666
case msg =>
67-
Thread.sleep(500)
67+
Thread.sleep(sleepDuration)
6868
sender() ! msg
6969
}
7070
}
7171

7272
val system = ActorSystem("EchoSystem")
73-
val echoActor = system.actorOf(Props(new EchoActor), name = "echoA")
73+
val echoActor = system.actorOf(Props(new EchoActor(0)), name = "echo")
74+
val sleepyActor = system.actorOf(Props(new EchoActor(50)), name = "sleepy")
7475

75-
val timeout = new RpcTimeout(50 millis, "spark.rpc.short.timeout")
76+
val shortProp = "spark.rpc.short.timeout"
77+
val timeout = new RpcTimeout(10 millis, shortProp)
7678

77-
val fut = echoActor.ask("hello")(1000 millis).mapTo[String].recover {
78-
case te: TimeoutException => throw timeout.amend(te)
79-
}
79+
try {
8080

81-
fut.onFailure {
82-
case te: TimeoutException => println("failed with timeout exception")
83-
}
81+
// Ask with immediate response
82+
var fut = echoActor.ask("hello")(timeout.duration).mapTo[String].
83+
recover(timeout.addMessageIfTimeout)
8484

85-
fut.onComplete {
86-
case Success(str) => println("future success")
87-
case Failure(ex) => println("future failure")
88-
}
85+
// This should complete successfully
86+
val result = timeout.awaitResult(fut)
8987

90-
println("sleeping")
91-
Thread.sleep(50)
92-
println("Future complete: " + fut.isCompleted.toString() + ", " + fut.value.toString())
88+
assert(result.nonEmpty)
9389

94-
println("Caught TimeoutException: " +
95-
intercept[TimeoutException] {
96-
//timeout.awaitResult(fut) // prints RpcTimeout description twice
97-
Await.result(fut, 10 millis)
98-
}.getMessage()
99-
)
90+
// Ask with delayed response
91+
fut = sleepyActor.ask("goodbye")(timeout.duration).mapTo[String].
92+
recover(timeout.addMessageIfTimeout)
10093

101-
/*
102-
val ref = env.setupEndpoint("test_future", new RpcEndpoint {
103-
override val rpcEnv = env
94+
// Allow future to complete with failure using plain Await.result, this will return
95+
// once the future is complete
96+
val msg1 =
97+
intercept[RpcTimeoutException] {
98+
Await.result(fut, 200 millis)
99+
}.getMessage()
104100

105-
override def receive = {
106-
case _ =>
107-
}
108-
})
109-
val conf = new SparkConf()
110-
val newRpcEnv = new AkkaRpcEnvFactory().create(
111-
RpcEnvConfig(conf, "test", "localhost", 12346, new SecurityManager(conf)))
112-
try {
113-
val newRef = newRpcEnv.setupEndpointRef("local", ref.address, "test_future")
114-
val akkaActorRef = newRef.asInstanceOf[AkkaRpcEndpointRef].actorRef
101+
assert(msg1.contains(shortProp))
115102

116-
val timeout = new RpcTimeout(1 millis, "spark.rpc.short.timeout")
117-
val fut = akkaActorRef.ask("hello")(timeout.duration).mapTo[String]
103+
// Use RpcTimeout.awaitResult to process Future, since it has already failed with
104+
// RpcTimeoutException, the same exception should be thrown
105+
val msg2 =
106+
intercept[RpcTimeoutException] {
107+
timeout.awaitResult(fut)
108+
}.getMessage()
118109

119-
Thread.sleep(500)
120-
println("Future complete: " + fut.isCompleted.toString() + ", " + fut.value.toString())
110+
// Ensure description is not in message twice after addMessageIfTimeout and awaitResult
111+
assert(shortProp.r.findAllIn(msg2).length === 1)
121112

122113
} finally {
123-
newRpcEnv.shutdown()
114+
system.shutdown()
124115
}
125-
*/
126-
127-
128116
}
129117

130118
}

0 commit comments

Comments
 (0)