Skip to content

Commit ec676f4

Browse files
committed
Merge branch 'SPARK-2157' of github.com:ash211/spark into configure-ports
2 parents 8e7d5ba + 038a579 commit ec676f4

File tree

9 files changed

+150
-50
lines changed

9 files changed

+150
-50
lines changed

core/src/main/scala/org/apache/spark/HttpFileServer.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
3131
var httpServer : HttpServer = null
3232
var serverUri : String = null
3333

34-
def initialize() {
34+
def initialize(port: Option[Int]) {
3535
baseDir = Utils.createTempDir()
3636
fileDir = new File(baseDir, "files")
3737
jarDir = new File(baseDir, "jars")
3838
fileDir.mkdir()
3939
jarDir.mkdir()
4040
logInfo("HTTP File server directory is " + baseDir)
41-
httpServer = new HttpServer(baseDir, securityManager)
41+
httpServer = if (port.isEmpty) {
42+
new HttpServer(baseDir, securityManager)
43+
} else {
44+
new HttpServer(baseDir, securityManager, port.get)
45+
}
4246
httpServer.start()
4347
serverUri = httpServer.uri
4448
logDebug("HTTP file server started at: " + serverUri)

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark
1919

2020
import java.io.File
2121

22+
import org.apache.spark.network.PortManager
2223
import org.eclipse.jetty.util.security.{Constraint, Password}
2324
import org.eclipse.jetty.security.authentication.DigestAuthenticator
2425
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
@@ -41,45 +42,54 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
4142
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
4243
* around a Jetty server.
4344
*/
44-
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
45-
extends Logging {
45+
private[spark] class HttpServer(resourceBase: File,
46+
securityManager: SecurityManager,
47+
localPort: Int = 0) extends Logging {
4648
private var server: Server = null
47-
private var port: Int = -1
49+
private var port: Int = localPort
50+
51+
private def startOnPort(startPort: Int): (Server, Int) = {
52+
val server = new Server()
53+
val connector = new SocketConnector
54+
connector.setMaxIdleTime(60*1000)
55+
connector.setSoLingerTime(-1)
56+
connector.setPort(startPort)
57+
server.addConnector(connector)
58+
59+
val threadPool = new QueuedThreadPool
60+
threadPool.setDaemon(true)
61+
server.setThreadPool(threadPool)
62+
val resHandler = new ResourceHandler
63+
resHandler.setResourceBase(resourceBase.getAbsolutePath)
64+
65+
val handlerList = new HandlerList
66+
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
67+
68+
if (securityManager.isAuthenticationEnabled()) {
69+
logDebug("HttpServer is using security")
70+
val sh = setupSecurityHandler(securityManager)
71+
// make sure we go through security handler to get resources
72+
sh.setHandler(handlerList)
73+
server.setHandler(sh)
74+
} else {
75+
logDebug("HttpServer is not using security")
76+
server.setHandler(handlerList)
77+
}
78+
79+
server.start()
80+
val actualPort = server.getConnectors()(0).getLocalPort()
81+
82+
(server, actualPort)
83+
}
4884

4985
def start() {
5086
if (server != null) {
5187
throw new ServerStateException("Server is already started")
5288
} else {
5389
logInfo("Starting HTTP Server")
54-
server = new Server()
55-
val connector = new SocketConnector
56-
connector.setMaxIdleTime(60*1000)
57-
connector.setSoLingerTime(-1)
58-
connector.setPort(0)
59-
server.addConnector(connector)
60-
61-
val threadPool = new QueuedThreadPool
62-
threadPool.setDaemon(true)
63-
server.setThreadPool(threadPool)
64-
val resHandler = new ResourceHandler
65-
resHandler.setResourceBase(resourceBase.getAbsolutePath)
66-
67-
val handlerList = new HandlerList
68-
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
69-
70-
if (securityManager.isAuthenticationEnabled()) {
71-
logDebug("HttpServer is using security")
72-
val sh = setupSecurityHandler(securityManager)
73-
// make sure we go through security handler to get resources
74-
sh.setHandler(handlerList)
75-
server.setHandler(sh)
76-
} else {
77-
logDebug("HttpServer is not using security")
78-
server.setHandler(handlerList)
79-
}
80-
81-
server.start()
82-
port = server.getConnectors()(0).getLocalPort()
90+
val (actualServer, actualPort) = PortManager.startWithIncrements(localPort, 3, startOnPort)
91+
server = actualServer
92+
port = actualPort
8393
}
8494
}
8595

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ object SparkEnv extends Logging {
226226
val httpFileServer =
227227
if (isDriver) {
228228
val server = new HttpFileServer(securityManager)
229-
server.initialize()
229+
server.initialize(conf.getOption("spark.fileserver.port").map(_.toInt))
230230
conf.set("spark.fileserver.uri", server.serverUri)
231231
server
232232
} else {

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging {
152152

153153
private def createServer(conf: SparkConf) {
154154
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
155-
server = new HttpServer(broadcastDir, securityManager)
155+
val broadcastListenPort: Int = conf.getInt("spark.broadcast.port", 0)
156+
server = new HttpServer(broadcastDir, securityManager, broadcastListenPort)
156157
server.start()
157158
serverUri = server.uri
158159
logInfo("Broadcast server started at " + serverUri)

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
105105
serverChannel.socket.setReuseAddress(true)
106106
serverChannel.socket.setReceiveBufferSize(256 * 1024)
107107

108-
serverChannel.socket.bind(new InetSocketAddress(port))
108+
private def startService(port: Int) = {
109+
serverChannel.socket.bind(new InetSocketAddress(port))
110+
(serverChannel, port)
111+
}
112+
PortManager.startWithIncrements(port, 3, startService)
109113
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
110114

111115
val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network
19+
20+
import java.net.InetSocketAddress
21+
22+
import org.apache.spark.{Logging, SparkException}
23+
import org.eclipse.jetty.server.Server
24+
25+
private[spark] object PortManager extends Logging
26+
{
27+
28+
/**
29+
* Start service on given port, or attempt to fall back to the n+1 port for a certain number of
30+
* retries
31+
*
32+
* @param startPort
33+
* @param maxRetries Maximum number of retries to attempt. A value of e.g. 3 will cause 4
34+
* total attempts, on ports n, n+1, n+2, and n+3
35+
* @param startService Function to start service on a given port. Expected to throw a java.net
36+
* .BindException if the port is already in use
37+
* @tparam T
38+
* @throws SparkException When unable to start service in the given number of attempts
39+
* @return
40+
*/
41+
def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => (T, Int)):
42+
(T, Int) = {
43+
for( offset <- 0 to maxRetries) {
44+
val tryPort = startPort + offset
45+
try {
46+
return startService(tryPort)
47+
} catch {
48+
case e: java.net.BindException => {
49+
if (!e.getMessage.contains("Address already in use") ||
50+
offset == (maxRetries-1)) {
51+
throw e
52+
}
53+
logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1))
54+
}
55+
case e: Exception => throw e
56+
}
57+
}
58+
throw new SparkException(s"Couldn't start service on port $startPort")
59+
}
60+
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ private[spark] class BlockManager(
6363
val shuffleBlockManager = new ShuffleBlockManager(this)
6464
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
6565
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
66-
val connectionManager = new ConnectionManager(0, conf, securityManager)
66+
val connectionManager = new ConnectionManager(conf.getInt("spark.blockManager.port", 0), conf,
67+
securityManager)
6768

6869
implicit val futureExecContext = connectionManager.futureExecContext
6970

docs/spark-standalone.md

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ configure those ports.
319319
</tr>
320320
<tr>
321321
<td>Browser</td>
322-
<td>Driver</td>
322+
<td>Application</td>
323323
<td>4040</td>
324324
<td>Web UI</td>
325325
<td><code>spark.ui.port</code></td>
@@ -369,18 +369,37 @@ configure those ports.
369369

370370
<!-- Other misc stuff -->
371371
<tr>
372-
<td>Driver and other Workers</td>
373372
<td>Worker</td>
373+
<td>Application</td>
374374
<td>(random)</td>
375-
<td>
376-
<ul>
377-
<li>File server for file and jars</li>
378-
<li>Http Broadcast</li>
379-
<li>Class file server (Spark Shell only)</li>
380-
</ul>
381-
</td>
382-
<td>None</td>
383-
<td>Jetty-based. Each of these services starts on a random port that cannot be configured</td>
375+
<td>File server for files and jars</td>
376+
<td><code>spark.fileserver.port</code></td>
377+
<td>Jetty-based</td>
378+
</tr>
379+
<tr>
380+
<td>Worker</td>
381+
<td>Application</td>
382+
<td>(random)</td>
383+
<td>HTTP Broadcast</td>
384+
<td><code>spark.broadcast.port</code></td>
385+
<td>Jetty-based. Not used by TorrentBroadcast, which sends data through the block manager
386+
instead</td>
387+
</tr>
388+
<tr>
389+
<td>Worker</td>
390+
<td>Spark Shell</td>
391+
<td>(random)</td>
392+
<td>Class file server (Spark Shell only)</td>
393+
<td><code>spark.replClassServer.port</code></td>
394+
<td>Jetty-based</td>
395+
</tr>
396+
<tr>
397+
<td>Worker</td>
398+
<td>Other Workers</td>
399+
<td>(random)</td>
400+
<td>Block Manager port</td>
401+
<td><code>spark.blockManager.port</code></td>
402+
<td>Raw socket via ServerSocketChannel</td>
384403
</tr>
385404

386405
</table>

repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ import org.apache.spark.util.Utils
102102

103103
val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles
104104
/** Jetty server that will serve our classes to worker nodes */
105-
val classServer = new HttpServer(outputDir, new SecurityManager(conf))
105+
val classServerListenPort = conf.getInt("spark.replClassServer.port", 0)
106+
val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerListenPort)
106107
private var currentSettings: Settings = initialSettings
107108
var printResults = true // whether to print result lines
108109
var totalSilence = false // whether to print anything

0 commit comments

Comments
 (0)