@@ -249,7 +249,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
249
249
def run () {
250
250
try {
251
251
while (! selectorThread.isInterrupted) {
252
- while (! registerRequests.isEmpty) {
252
+ while (! registerRequests.isEmpty) {
253
253
val conn : SendingConnection = registerRequests.dequeue()
254
254
addListeners(conn)
255
255
conn.connect()
@@ -308,7 +308,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
308
308
// Some keys within the selectors list are invalid/closed. clear them.
309
309
val allKeys = selector.keys().iterator()
310
310
311
- while (allKeys.hasNext() ) {
311
+ while (allKeys.hasNext) {
312
312
val key = allKeys.next()
313
313
try {
314
314
if (! key.isValid) {
@@ -341,7 +341,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
341
341
342
342
if (0 != selectedKeysCount) {
343
343
val selectedKeys = selector.selectedKeys().iterator()
344
- while (selectedKeys.hasNext() ) {
344
+ while (selectedKeys.hasNext) {
345
345
val key = selectedKeys.next
346
346
selectedKeys.remove()
347
347
try {
@@ -419,62 +419,62 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
419
419
connectionsByKey -= connection.key
420
420
421
421
try {
422
- if ( connection. isInstanceOf [ SendingConnection ]) {
423
- val sendingConnection = connection. asInstanceOf [ SendingConnection ]
424
- val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
425
- logInfo(" Removing SendingConnection to " + sendingConnectionManagerId)
422
+ connection match {
423
+ case sendingConnection : SendingConnection =>
424
+ val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
425
+ logInfo(" Removing SendingConnection to " + sendingConnectionManagerId)
426
426
427
- connectionsById -= sendingConnectionManagerId
428
- connectionsAwaitingSasl -= connection.connectionId
427
+ connectionsById -= sendingConnectionManagerId
428
+ connectionsAwaitingSasl -= connection.connectionId
429
429
430
- messageStatuses.synchronized {
431
- messageStatuses
432
- .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => {
430
+ messageStatuses.synchronized {
431
+ messageStatuses
432
+ .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => {
433
433
logInfo(" Notifying " + status)
434
434
status.synchronized {
435
- status.attempted = true
436
- status.acked = false
437
- status.markDone()
435
+ status.attempted = true
436
+ status.acked = false
437
+ status.markDone()
438
438
}
439
439
})
440
440
441
- messageStatuses.retain((i, status) => {
442
- status.connectionManagerId != sendingConnectionManagerId
443
- })
444
- }
445
- } else if (connection.isInstanceOf [ReceivingConnection ]) {
446
- val receivingConnection = connection.asInstanceOf [ReceivingConnection ]
447
- val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId()
448
- logInfo(" Removing ReceivingConnection to " + remoteConnectionManagerId)
449
-
450
- val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
451
- if (! sendingConnectionOpt.isDefined) {
452
- logError(" Corresponding SendingConnectionManagerId not found" )
453
- return
454
- }
441
+ messageStatuses.retain((i, status) => {
442
+ status.connectionManagerId != sendingConnectionManagerId
443
+ })
444
+ }
445
+ case receivingConnection : ReceivingConnection =>
446
+ val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId()
447
+ logInfo(" Removing ReceivingConnection to " + remoteConnectionManagerId)
455
448
456
- val sendingConnection = sendingConnectionOpt.get
457
- connectionsById -= remoteConnectionManagerId
458
- sendingConnection.close()
449
+ val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
450
+ if (! sendingConnectionOpt.isDefined) {
451
+ logError(" Corresponding SendingConnectionManagerId not found" )
452
+ return
453
+ }
459
454
460
- val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
455
+ val sendingConnection = sendingConnectionOpt.get
456
+ connectionsById -= remoteConnectionManagerId
457
+ sendingConnection.close()
461
458
462
- assert ( sendingConnectionManagerId == remoteConnectionManagerId )
459
+ val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId( )
463
460
464
- messageStatuses.synchronized {
465
- for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
466
- logInfo(" Notifying " + s)
467
- s.synchronized {
468
- s.attempted = true
469
- s.acked = false
470
- s.markDone()
461
+ assert(sendingConnectionManagerId == remoteConnectionManagerId)
462
+
463
+ messageStatuses.synchronized {
464
+ for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
465
+ logInfo(" Notifying " + s)
466
+ s.synchronized {
467
+ s.attempted = true
468
+ s.acked = false
469
+ s.markDone()
470
+ }
471
471
}
472
- }
473
472
474
- messageStatuses.retain((i, status) => {
475
- status.connectionManagerId != sendingConnectionManagerId
476
- })
477
- }
473
+ messageStatuses.retain((i, status) => {
474
+ status.connectionManagerId != sendingConnectionManagerId
475
+ })
476
+ }
477
+ case _ => logError(" Unsupported type of connection." )
478
478
}
479
479
} finally {
480
480
// So that the selection keys can be removed.
@@ -517,13 +517,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
517
517
logDebug(" Client sasl completed for id: " + waitingConn.connectionId)
518
518
connectionsAwaitingSasl -= waitingConn.connectionId
519
519
waitingConn.getAuthenticated().synchronized {
520
- waitingConn.getAuthenticated().notifyAll();
520
+ waitingConn.getAuthenticated().notifyAll()
521
521
}
522
522
return
523
523
} else {
524
524
var replyToken : Array [Byte ] = null
525
525
try {
526
- replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken);
526
+ replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken)
527
527
if (waitingConn.isSaslComplete()) {
528
528
logDebug(" Client sasl completed after evaluate for id: " + waitingConn.connectionId)
529
529
connectionsAwaitingSasl -= waitingConn.connectionId
@@ -533,7 +533,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
533
533
return
534
534
}
535
535
val securityMsgResp = SecurityMessage .fromResponse(replyToken,
536
- securityMsg.getConnectionId.toString() )
536
+ securityMsg.getConnectionId.toString)
537
537
val message = securityMsgResp.toBufferMessage
538
538
if (message == null ) throw new Exception (" Error creating security message" )
539
539
sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message)
@@ -630,13 +630,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
630
630
case bufferMessage : BufferMessage => {
631
631
if (authEnabled) {
632
632
val res = handleAuthentication(connection, bufferMessage)
633
- if (res == true ) {
633
+ if (res) {
634
634
// message was security negotiation so skip the rest
635
635
logDebug(" After handleAuth result was true, returning" )
636
636
return
637
637
}
638
638
}
639
- if (bufferMessage.hasAckId) {
639
+ if (bufferMessage.hasAckId() ) {
640
640
val sentMessageStatus = messageStatuses.synchronized {
641
641
messageStatuses.get(bufferMessage.ackId) match {
642
642
case Some (status) => {
@@ -646,7 +646,6 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
646
646
case None => {
647
647
throw new Exception (" Could not find reference for received ack message " +
648
648
message.id)
649
- null
650
649
}
651
650
}
652
651
}
@@ -668,7 +667,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
668
667
if (ackMessage.isDefined) {
669
668
if (! ackMessage.get.isInstanceOf [BufferMessage ]) {
670
669
logDebug(" Response to " + bufferMessage + " is not a buffer message, it is of type "
671
- + ackMessage.get.getClass() )
670
+ + ackMessage.get.getClass)
672
671
} else if (! ackMessage.get.asInstanceOf [BufferMessage ].hasAckId) {
673
672
logDebug(" Response to " + bufferMessage + " does not have ack id set" )
674
673
ackMessage.get.asInstanceOf [BufferMessage ].ackId = bufferMessage.id
0 commit comments