Skip to content

Commit ba28a8f

Browse files
rxinaarondav
authored andcommitted
[SPARK-2936] Migrate Netty network module from Java to Scala
The Netty network module was originally written when Scala 2.9.x had a bug that prevents a pure Scala implementation, and a subset of the files were done in Java. We have since upgraded to Scala 2.10, and can migrate all Java files now to Scala. netty/netty#781 mesos/spark#522 Author: Reynold Xin <[email protected]> Closes #1865 from rxin/netty and squashes the following commits: 332422f [Reynold Xin] Code review feedback ca9eeee [Reynold Xin] Minor update. 7f1434b [Reynold Xin] [SPARK-2936] Migrate Netty network module from Java to Scala
1 parent b715aa0 commit ba28a8f

File tree

12 files changed

+292
-364
lines changed

12 files changed

+292
-364
lines changed

core/src/main/java/org/apache/spark/network/netty/FileClient.java

Lines changed: 0 additions & 100 deletions
This file was deleted.

core/src/main/java/org/apache/spark/network/netty/FileServer.java

Lines changed: 0 additions & 111 deletions
This file was deleted.

core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java

Lines changed: 0 additions & 83 deletions
This file was deleted.
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.netty
19+
20+
import java.util.concurrent.TimeUnit
21+
22+
import io.netty.bootstrap.Bootstrap
23+
import io.netty.channel.{Channel, ChannelOption, EventLoopGroup}
24+
import io.netty.channel.oio.OioEventLoopGroup
25+
import io.netty.channel.socket.oio.OioSocketChannel
26+
27+
import org.apache.spark.Logging
28+
29+
class FileClient(handler: FileClientHandler, connectTimeout: Int) extends Logging {
30+
31+
private var channel: Channel = _
32+
private var bootstrap: Bootstrap = _
33+
private var group: EventLoopGroup = _
34+
private val sendTimeout = 60
35+
36+
def init(): Unit = {
37+
group = new OioEventLoopGroup
38+
bootstrap = new Bootstrap
39+
bootstrap.group(group)
40+
.channel(classOf[OioSocketChannel])
41+
.option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
42+
.option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
43+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(connectTimeout))
44+
.handler(new FileClientChannelInitializer(handler))
45+
}
46+
47+
def connect(host: String, port: Int) {
48+
try {
49+
channel = bootstrap.connect(host, port).sync().channel()
50+
} catch {
51+
case e: InterruptedException =>
52+
logWarning("FileClient interrupted while trying to connect", e)
53+
close()
54+
}
55+
}
56+
57+
def waitForClose(): Unit = {
58+
try {
59+
channel.closeFuture.sync()
60+
} catch {
61+
case e: InterruptedException =>
62+
logWarning("FileClient interrupted", e)
63+
}
64+
}
65+
66+
def sendRequest(file: String): Unit = {
67+
try {
68+
val bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS)
69+
if (!bSent) {
70+
throw new RuntimeException("Failed to send")
71+
}
72+
} catch {
73+
case e: InterruptedException =>
74+
logError("Error", e)
75+
}
76+
}
77+
78+
def close(): Unit = {
79+
if (group != null) {
80+
group.shutdownGracefully()
81+
group = null
82+
bootstrap = null
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)