Skip to content

Commit 67bcb46

Browse files
put conf at 3rd position, modify suite class, add comments
1 parent f450cd1 commit 67bcb46

File tree

7 files changed

+12
-10
lines changed

7 files changed

+12
-10
lines changed

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private[spark] class HttpServer(
5757
} else {
5858
logInfo("Starting HTTP Server")
5959
val (actualServer, actualPort) =
60-
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName, new SparkConf())
60+
Utils.startServiceOnPort[Server](requestedPort, doStart, new SparkConf(), serverName)
6161
server = actualServer
6262
port = actualPort
6363
}

core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ private[nio] class ConnectionManager(
174174
serverChannel.socket.bind(new InetSocketAddress(port))
175175
(serverChannel, serverChannel.socket.getLocalPort)
176176
}
177-
Utils.startServiceOnPort[ServerSocketChannel](port, startService, name, conf)
177+
Utils.startServiceOnPort[ServerSocketChannel](port, startService, conf, name)
178178
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
179179

180180
val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ private[spark] object JettyUtils extends Logging {
201201
}
202202
}
203203

204-
val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName, conf)
204+
val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName)
205205
ServerInfo(server, boundPort, collection)
206206
}
207207

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging {
5353
val startService: Int => (ActorSystem, Int) = { actualPort =>
5454
doCreateActorSystem(name, host, actualPort, conf, securityManager)
5555
}
56-
Utils.startServiceOnPort(port, startService, name, conf)
56+
Utils.startServiceOnPort(port, startService, conf, name)
5757
}
5858

5959
private def doCreateActorSystem(

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1706,16 +1706,17 @@ private[spark] object Utils extends Logging {
17061706
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
17071707
*
17081708
* @param startPort The initial port to start the service on.
1709-
* @param maxRetries Maximum number of retries to attempt.
1710-
* A value of 3 means attempting ports n, n+1, n+2, and n+3, for example.
17111709
* @param startService Function to start service on a given port.
17121710
* This is expected to throw java.net.BindException on port collision.
1711+
* @param conf Used to get maximum number of retries.
1712+
* @param serviceName Name of the service.
17131713
*/
17141714
def startServiceOnPort[T](
17151715
startPort: Int,
17161716
startService: Int => (T, Int),
1717-
serviceName: String = "",
1718-
conf: SparkConf): (T, Int) = {
1717+
conf: SparkConf,
1718+
serviceName: String = ""
1719+
): (T, Int) = {
17191720
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
17201721
val maxRetries = portMaxRetries(conf)
17211722
logInfo(s"Starting service$serviceString on port $startPort with maximum $maxRetries retries. ")

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
8080
val socket = new ServerSocket(trialPort)
8181
socket.close()
8282
(null, trialPort)
83-
})._2
83+
}, conf)._2
8484
}
8585

8686
/** Setup and start the streaming context */

external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.storage.StorageLevel
2929
import org.apache.spark.streaming.dstream.ReceiverInputDStream
3030
import org.eclipse.paho.client.mqttv3._
3131
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
32+
import org.apache.spark.SparkConf
3233

3334
class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
3435

@@ -101,7 +102,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
101102
val socket = new ServerSocket(trialPort)
102103
socket.close()
103104
(null, trialPort)
104-
})._2
105+
}, new SparkConf())._2
105106
}
106107

107108
def publishData(data: String): Unit = {

0 commit comments

Comments
 (0)