@@ -155,16 +155,23 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
155
155
})
156
156
157
157
val conf = new SparkConf ()
158
+ val shortProp = " spark.rpc.short.timeout"
158
159
conf.set(" spark.rpc.retry.wait" , " 0" )
159
160
conf.set(" spark.rpc.numRetries" , " 1" )
160
161
val anotherEnv = createRpcEnv(conf, " remote" , 13345 )
161
162
// Use anotherEnv to find out the RpcEndpointRef
162
163
val rpcEndpointRef = anotherEnv.setupEndpointRef(" local" , env.address, " ask-timeout" )
163
164
try {
164
165
val e = intercept[Exception ] {
165
- rpcEndpointRef.askWithRetry[String ](" hello" , 1 millis)
166
+ rpcEndpointRef.askWithRetry[String ](" hello" , new RpcTimeout ( 1 millis, shortProp) )
166
167
}
167
168
assert(e.isInstanceOf [TimeoutException ] || e.getCause.isInstanceOf [TimeoutException ])
169
+ e match {
170
+ case te : TimeoutException =>
171
+ assert(te.getMessage().contains(shortProp))
172
+ case e : Exception =>
173
+ assert(e.getCause().getMessage().contains(shortProp))
174
+ }
168
175
} finally {
169
176
anotherEnv.shutdown()
170
177
anotherEnv.awaitTermination()
@@ -539,6 +546,22 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
539
546
}
540
547
}
541
548
549
+ test(" construction of RpcTimeout using properties" ) {
550
+ val conf = new SparkConf
551
+
552
+ val testProp = " spark.ask.test.timeout"
553
+ val testDurationSeconds = 30
554
+
555
+ conf.set(testProp, testDurationSeconds.toString + " s" )
556
+
557
+ val rt = RpcTimeout (conf, testProp)
558
+ assert( testDurationSeconds === rt.duration.toSeconds )
559
+
560
+ val ex = intercept[Throwable ] {
561
+ RpcTimeout (conf, " spark.ask.invalid.timeout" )
562
+ }
563
+ }
564
+
542
565
}
543
566
544
567
class UnserializableClass
0 commit comments