@@ -56,73 +56,4 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
56
56
}
57
57
}
58
58
59
- test(" timeout on ask Future with RpcTimeout" ) {
60
-
61
- class EchoActor (sleepDuration : Long ) extends Actor {
62
- def receive : Receive = {
63
- case msg =>
64
- Thread .sleep(sleepDuration)
65
- sender() ! msg
66
- }
67
- }
68
-
69
- val akkaConf = ConfigFactory .empty().withValue(" akka.log-dead-letters" ,
70
- ConfigValueFactory .fromAnyRef(" off" ))
71
- val system = ActorSystem (" EchoSystem" , akkaConf)
72
- val echoActor = system.actorOf(Props (new EchoActor (0 )), name = " echo" )
73
- val sleepyActor = system.actorOf(Props (new EchoActor (50 )), name = " sleepy" )
74
-
75
- val longProp = " spark.rpc.long.timeout"
76
- val longTimeout = new RpcTimeout (1 second, longProp)
77
- val shortProp = " spark.rpc.short.timeout"
78
- val shortTimeout = new RpcTimeout (10 millis, shortProp)
79
-
80
- try {
81
-
82
- // Ask with immediate response
83
- var fut = echoActor.ask(" hello" )(longTimeout.duration).mapTo[String ].
84
- recover(longTimeout.addMessageIfTimeout)
85
-
86
- // This should complete successfully
87
- val result = longTimeout.awaitResult(fut)
88
-
89
- assert(result.nonEmpty)
90
-
91
- // Ask with a delayed response and wait for response immediately that should timeout
92
- fut = sleepyActor.ask(" doh" )(shortTimeout.duration).mapTo[String ]
93
- val msg1 =
94
- intercept[RpcTimeoutException ] {
95
- shortTimeout.awaitResult(fut)
96
- }.getMessage()
97
-
98
- assert(msg1.contains(shortProp))
99
-
100
- // Ask with delayed response using addMessageIfTimeout in recover callback
101
- fut = sleepyActor.ask(" goodbye" )(shortTimeout.duration).mapTo[String ].
102
- recover(shortTimeout.addMessageIfTimeout)
103
-
104
- // Allow future to complete with failure using plain Await.result, this will return
105
- // once the future is complete to verify addMessageIfTimeout was invoked
106
- val msg2 =
107
- intercept[RpcTimeoutException ] {
108
- Await .result(fut, 200 millis)
109
- }.getMessage()
110
-
111
- assert(msg2.contains(shortProp))
112
-
113
- // Use RpcTimeout.awaitResult to process Future, since it has already failed with
114
- // RpcTimeoutException, the same exception should be thrown
115
- val msg3 =
116
- intercept[RpcTimeoutException ] {
117
- shortTimeout.awaitResult(fut)
118
- }.getMessage()
119
-
120
- // Ensure description is not in message twice after addMessageIfTimeout and awaitResult
121
- assert(shortProp.r.findAllIn(msg3).length === 1 )
122
-
123
- } finally {
124
- system.shutdown()
125
- }
126
- }
127
-
128
59
}
0 commit comments