Skip to content

Commit 54a0ca0

Browse files
committed
Merge branch 'master' of github.com:apache/spark into fix-flaky-streaming-test
2 parents 664095c + 4878911 commit 54a0ca0

File tree

26 files changed

+449
-189
lines changed

26 files changed

+449
-189
lines changed

core/src/main/scala/org/apache/spark/HttpFileServer.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import com.google.common.io.Files
2323

2424
import org.apache.spark.util.Utils
2525

26-
private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {
26+
private[spark] class HttpFileServer(
27+
securityManager: SecurityManager,
28+
requestedPort: Int = 0)
29+
extends Logging {
2730

2831
var baseDir : File = null
2932
var fileDir : File = null
@@ -38,7 +41,7 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
3841
fileDir.mkdir()
3942
jarDir.mkdir()
4043
logInfo("HTTP File server directory is " + baseDir)
41-
httpServer = new HttpServer(baseDir, securityManager)
44+
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
4245
httpServer.start()
4346
serverUri = httpServer.uri
4447
logDebug("HTTP file server started at: " + serverUri)

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121

2222
import org.eclipse.jetty.util.security.{Constraint, Password}
2323
import org.eclipse.jetty.security.authentication.DigestAuthenticator
24-
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
24+
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
2525

2626
import org.eclipse.jetty.server.Server
2727
import org.eclipse.jetty.server.bio.SocketConnector
@@ -41,48 +41,68 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
4141
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
4242
* around a Jetty server.
4343
*/
44-
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
45-
extends Logging {
44+
private[spark] class HttpServer(
45+
resourceBase: File,
46+
securityManager: SecurityManager,
47+
requestedPort: Int = 0,
48+
serverName: String = "HTTP server")
49+
extends Logging {
50+
4651
private var server: Server = null
47-
private var port: Int = -1
52+
private var port: Int = requestedPort
4853

4954
def start() {
5055
if (server != null) {
5156
throw new ServerStateException("Server is already started")
5257
} else {
5358
logInfo("Starting HTTP Server")
54-
server = new Server()
55-
val connector = new SocketConnector
56-
connector.setMaxIdleTime(60*1000)
57-
connector.setSoLingerTime(-1)
58-
connector.setPort(0)
59-
server.addConnector(connector)
60-
61-
val threadPool = new QueuedThreadPool
62-
threadPool.setDaemon(true)
63-
server.setThreadPool(threadPool)
64-
val resHandler = new ResourceHandler
65-
resHandler.setResourceBase(resourceBase.getAbsolutePath)
66-
67-
val handlerList = new HandlerList
68-
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
69-
70-
if (securityManager.isAuthenticationEnabled()) {
71-
logDebug("HttpServer is using security")
72-
val sh = setupSecurityHandler(securityManager)
73-
// make sure we go through security handler to get resources
74-
sh.setHandler(handlerList)
75-
server.setHandler(sh)
76-
} else {
77-
logDebug("HttpServer is not using security")
78-
server.setHandler(handlerList)
79-
}
80-
81-
server.start()
82-
port = server.getConnectors()(0).getLocalPort()
59+
val (actualServer, actualPort) =
60+
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
61+
server = actualServer
62+
port = actualPort
8363
}
8464
}
8565

66+
/**
67+
* Actually start the HTTP server on the given port.
68+
*
69+
* Note that this is only best effort in the sense that we may end up binding to a nearby port
70+
* in the event of port collision. Return the bound server and the actual port used.
71+
*/
72+
private def doStart(startPort: Int): (Server, Int) = {
73+
val server = new Server()
74+
val connector = new SocketConnector
75+
connector.setMaxIdleTime(60 * 1000)
76+
connector.setSoLingerTime(-1)
77+
connector.setPort(startPort)
78+
server.addConnector(connector)
79+
80+
val threadPool = new QueuedThreadPool
81+
threadPool.setDaemon(true)
82+
server.setThreadPool(threadPool)
83+
val resHandler = new ResourceHandler
84+
resHandler.setResourceBase(resourceBase.getAbsolutePath)
85+
86+
val handlerList = new HandlerList
87+
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
88+
89+
if (securityManager.isAuthenticationEnabled()) {
90+
logDebug("HttpServer is using security")
91+
val sh = setupSecurityHandler(securityManager)
92+
// make sure we go through security handler to get resources
93+
sh.setHandler(handlerList)
94+
server.setHandler(sh)
95+
} else {
96+
logDebug("HttpServer is not using security")
97+
server.setHandler(handlerList)
98+
}
99+
100+
server.start()
101+
val actualPort = server.getConnectors()(0).getLocalPort
102+
103+
(server, actualPort)
104+
}
105+
86106
/**
87107
* Setup Jetty to the HashLoginService using a single user with our
88108
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
@@ -134,7 +154,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
134154
if (server == null) {
135155
throw new ServerStateException("Server is not started")
136156
} else {
137-
return "http://" + Utils.localIpAddress + ":" + port
157+
"http://" + Utils.localIpAddress + ":" + port
138158
}
139159
}
140160
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,14 @@ private[spark] object SparkConf {
323323
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
324324
*/
325325
def isExecutorStartupConf(name: String): Boolean = {
326-
isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
326+
isAkkaConf(name) ||
327+
name.startsWith("spark.akka") ||
328+
name.startsWith("spark.auth") ||
329+
isSparkPortConf(name)
327330
}
331+
332+
/**
333+
* Return whether the given config is a Spark port config.
334+
*/
335+
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
328336
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.net.Socket
2222

2323
import scala.collection.JavaConversions._
2424
import scala.collection.mutable
25-
import scala.concurrent.Await
2625
import scala.util.Properties
2726

2827
import akka.actor._
@@ -151,10 +150,10 @@ object SparkEnv extends Logging {
151150
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
152151
securityManager = securityManager)
153152

154-
// Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
155-
// figure out which port number Akka actually bound to and set spark.driver.port to it.
156-
if (isDriver && port == 0) {
157-
conf.set("spark.driver.port", boundPort.toString)
153+
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
154+
// This is so that we tell the executors the correct port to connect to.
155+
if (isDriver) {
156+
conf.set("spark.driver.port", boundPort.toString)
158157
}
159158

160159
// Create an instance of the class named by the given Java system property, or by
@@ -222,7 +221,8 @@ object SparkEnv extends Logging {
222221

223222
val httpFileServer =
224223
if (isDriver) {
225-
val server = new HttpFileServer(securityManager)
224+
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
225+
val server = new HttpFileServer(securityManager, fileServerPort)
226226
server.initialize()
227227
conf.set("spark.fileserver.uri", server.serverUri)
228228
server

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging {
152152

153153
private def createServer(conf: SparkConf) {
154154
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
155-
server = new HttpServer(broadcastDir, securityManager)
155+
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
156+
server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
156157
server.start()
157158
serverUri = server.uri
158159
logInfo("Broadcast server started at " + serverUri)

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,6 @@ object Client {
155155
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
156156
Logger.getRootLogger.setLevel(driverArgs.logLevel)
157157

158-
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
159-
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
160158
val (actorSystem, _) = AkkaUtils.createActorSystem(
161159
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
162160

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.util.AkkaUtils
2828
*/
2929
private[spark]
3030
class MasterWebUI(val master: Master, requestedPort: Int)
31-
extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {
31+
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {
3232

3333
val masterActorRef = master.self
3434
val timeout = AkkaUtils.askTimeout(master.conf)

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import javax.servlet.http.HttpServletRequest
2222

2323
import org.apache.spark.{Logging, SparkConf}
2424
import org.apache.spark.deploy.worker.Worker
25+
import org.apache.spark.deploy.worker.ui.WorkerWebUI._
2526
import org.apache.spark.ui.{SparkUI, WebUI}
2627
import org.apache.spark.ui.JettyUtils._
2728
import org.apache.spark.util.AkkaUtils
@@ -34,7 +35,7 @@ class WorkerWebUI(
3435
val worker: Worker,
3536
val workDir: File,
3637
port: Option[Int] = None)
37-
extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf)
38+
extends WebUI(worker.securityMgr, getUIPort(port, worker.conf), worker.conf, name = "WorkerUI")
3839
with Logging {
3940

4041
val timeout = AkkaUtils.askTimeout(worker.conf)

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
115115

116116
// Bootstrap to fetch the driver's Spark properties.
117117
val executorConf = new SparkConf
118+
val port = executorConf.getInt("spark.executor.port", 0)
118119
val (fetcher, _) = AkkaUtils.createActorSystem(
119-
"driverPropsFetcher", hostname, 0, executorConf, new SecurityManager(executorConf))
120+
"driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf))
120121
val driver = fetcher.actorSelection(driverUrl)
121122
val timeout = AkkaUtils.askTimeout(executorConf)
122123
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
@@ -126,7 +127,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
126127
// Create a new ActorSystem using driver's Spark properties to run the backend.
127128
val driverConf = new SparkConf().setAll(props)
128129
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
129-
"sparkExecutor", hostname, 0, driverConf, new SecurityManager(driverConf))
130+
"sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf))
130131
// set it
131132
val sparkHostPort = hostname + ":" + boundPort
132133
actorSystem.actorOf(

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,12 @@ import scala.language.postfixOps
3838
import org.apache.spark._
3939
import org.apache.spark.util.{SystemClock, Utils}
4040

41-
private[spark] class ConnectionManager(port: Int, conf: SparkConf,
42-
securityManager: SecurityManager) extends Logging {
41+
private[spark] class ConnectionManager(
42+
port: Int,
43+
conf: SparkConf,
44+
securityManager: SecurityManager,
45+
name: String = "Connection manager")
46+
extends Logging {
4347

4448
class MessageStatus(
4549
val message: Message,
@@ -105,7 +109,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
105109
serverChannel.socket.setReuseAddress(true)
106110
serverChannel.socket.setReceiveBufferSize(256 * 1024)
107111

108-
serverChannel.socket.bind(new InetSocketAddress(port))
112+
private def startService(port: Int): (ServerSocketChannel, Int) = {
113+
serverChannel.socket.bind(new InetSocketAddress(port))
114+
(serverChannel, serverChannel.socket.getLocalPort)
115+
}
116+
Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
109117
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
110118

111119
val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)

0 commit comments

Comments
 (0)