Skip to content

Commit bb8bd11

Browse files
WangTaoTheTonicAndrew Or
authored andcommitted
[SPARK-5006][Deploy]spark.port.maxRetries doesn't work
https://issues.apache.org/jira/browse/SPARK-5006 I think the issue is produced in #1777. Not digging mesos's backend yet. Maybe should add same logic either. Author: WangTaoTheTonic <[email protected]> Author: WangTao <[email protected]> Closes #3841 from WangTaoTheTonic/SPARK-5006 and squashes the following commits: 8cdf96d [WangTao] indent thing 2d86d65 [WangTaoTheTonic] fix line length 7cdfd98 [WangTaoTheTonic] fit for new HttpServer constructor 61a370d [WangTaoTheTonic] some minor fixes bc6e1ec [WangTaoTheTonic] rebase 67bcb46 [WangTaoTheTonic] put conf at 3rd position, modify suite class, add comments f450cd1 [WangTaoTheTonic] startServiceOnPort will use a SparkConf arg 29b751b [WangTaoTheTonic] rebase as ExecutorRunnableUtil changed to ExecutorRunnable 396c226 [WangTaoTheTonic] make the grammar more like scala 191face [WangTaoTheTonic] invalid value name 62ec336 [WangTaoTheTonic] spark.port.maxRetries doesn't work Conflicts: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
1 parent 37db20c commit bb8bd11

File tree

14 files changed

+34
-32
lines changed

14 files changed

+34
-32
lines changed

core/src/main/scala/org/apache/spark/HttpFileServer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import com.google.common.io.Files
2424
import org.apache.spark.util.Utils
2525

2626
private[spark] class HttpFileServer(
27+
conf: SparkConf,
2728
securityManager: SecurityManager,
2829
requestedPort: Int = 0)
2930
extends Logging {
@@ -41,7 +42,7 @@ private[spark] class HttpFileServer(
4142
fileDir.mkdir()
4243
jarDir.mkdir()
4344
logInfo("HTTP File server directory is " + baseDir)
44-
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
45+
httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
4546
httpServer.start()
4647
serverUri = httpServer.uri
4748
logDebug("HTTP file server started at: " + serverUri)

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
4242
* around a Jetty server.
4343
*/
4444
private[spark] class HttpServer(
45+
conf: SparkConf,
4546
resourceBase: File,
4647
securityManager: SecurityManager,
4748
requestedPort: Int = 0,
@@ -57,7 +58,7 @@ private[spark] class HttpServer(
5758
} else {
5859
logInfo("Starting HTTP Server")
5960
val (actualServer, actualPort) =
60-
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
61+
Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName)
6162
server = actualServer
6263
port = actualPort
6364
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,9 @@ private[spark] object SparkConf {
370370
}
371371

372372
/**
373-
* Return whether the given config is a Spark port config.
373+
* Return true if the given config matches either `spark.*.port` or `spark.port.*`.
374374
*/
375-
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
375+
def isSparkPortConf(name: String): Boolean = {
376+
(name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")
377+
}
376378
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ object SparkEnv extends Logging {
316316
val httpFileServer =
317317
if (isDriver) {
318318
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
319-
val server = new HttpFileServer(securityManager, fileServerPort)
319+
val server = new HttpFileServer(conf, securityManager, fileServerPort)
320320
server.initialize()
321321
conf.set("spark.fileserver.uri", server.serverUri)
322322
server

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ private[broadcast] object HttpBroadcast extends Logging {
153153
private def createServer(conf: SparkConf) {
154154
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
155155
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
156-
server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
156+
server =
157+
new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
157158
server.start()
158159
serverUri = server.uri
159160
logInfo("Broadcast server started at " + serverUri)

core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private[nio] class ConnectionManager(
164164
serverChannel.socket.bind(new InetSocketAddress(port))
165165
(serverChannel, serverChannel.socket.getLocalPort)
166166
}
167-
Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
167+
Utils.startServiceOnPort[ServerSocketChannel](port, startService, conf, name)
168168
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
169169

170170
val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ private[spark] object JettyUtils extends Logging {
201201
}
202202
}
203203

204-
val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName)
204+
val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName)
205205
ServerInfo(server, boundPort, collection)
206206
}
207207

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging {
5353
val startService: Int => (ActorSystem, Int) = { actualPort =>
5454
doCreateActorSystem(name, host, actualPort, conf, securityManager)
5555
}
56-
Utils.startServiceOnPort(port, startService, name)
56+
Utils.startServiceOnPort(port, startService, conf, name)
5757
}
5858

5959
private def doCreateActorSystem(

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1689,17 +1689,15 @@ private[spark] object Utils extends Logging {
16891689
}
16901690

16911691
/**
1692-
* Default maximum number of retries when binding to a port before giving up.
1692+
* Maximum number of retries when binding to a port before giving up.
16931693
*/
1694-
val portMaxRetries: Int = {
1695-
if (sys.props.contains("spark.testing")) {
1694+
def portMaxRetries(conf: SparkConf): Int = {
1695+
val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt)
1696+
if (conf.contains("spark.testing")) {
16961697
// Set a higher number of retries for tests...
1697-
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100)
1698+
maxRetries.getOrElse(100)
16981699
} else {
1699-
Option(SparkEnv.get)
1700-
.flatMap(_.conf.getOption("spark.port.maxRetries"))
1701-
.map(_.toInt)
1702-
.getOrElse(16)
1700+
maxRetries.getOrElse(16)
17031701
}
17041702
}
17051703

@@ -1708,17 +1706,18 @@ private[spark] object Utils extends Logging {
17081706
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
17091707
*
17101708
* @param startPort The initial port to start the service on.
1711-
* @param maxRetries Maximum number of retries to attempt.
1712-
* A value of 3 means attempting ports n, n+1, n+2, and n+3, for example.
17131709
* @param startService Function to start service on a given port.
17141710
* This is expected to throw java.net.BindException on port collision.
1711+
* @param conf A SparkConf used to get the maximum number of retries when binding to a port.
1712+
* @param serviceName Name of the service.
17151713
*/
17161714
def startServiceOnPort[T](
17171715
startPort: Int,
17181716
startService: Int => (T, Int),
1719-
serviceName: String = "",
1720-
maxRetries: Int = portMaxRetries): (T, Int) = {
1717+
conf: SparkConf,
1718+
serviceName: String = ""): (T, Int) = {
17211719
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
1720+
val maxRetries = portMaxRetries(conf)
17221721
for (offset <- 0 to maxRetries) {
17231722
// Do not increment port if startPort is 0, which is treated as a special port
17241723
val tryPort = if (startPort == 0) {

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
8080
val socket = new ServerSocket(trialPort)
8181
socket.close()
8282
(null, trialPort)
83-
})._2
83+
}, conf)._2
8484
}
8585

8686
/** Setup and start the streaming context */

0 commit comments

Comments
 (0)