Skip to content

Commit 1bf5109

Browse files
author
Andrew Or
committed
Use the shuffle service port specified through hadoop config
1 parent b4b1f0c commit 1bf5109

File tree

2 files changed

+15
-1
lines changed

2 files changed

+15
-1
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import akka.actor.{ActorSystem, Props}
3030
import sun.nio.ch.DirectBuffer
3131

3232
import org.apache.spark._
33+
import org.apache.spark.deploy.SparkHadoopUtil
3334
import org.apache.spark.executor._
3435
import org.apache.spark.io.CompressionCodec
3536
import org.apache.spark.network._
@@ -92,7 +93,19 @@ private[spark] class BlockManager(
9293

9394
private[spark]
9495
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
95-
private val externalShuffleServicePort = conf.getInt("spark.shuffle.service.port", 7337)
96+
97+
// In Yarn, the shuffle service port maybe set through the Hadoop config
98+
private val shuffleServicePortKey = "spark.shuffle.service.port"
99+
private val externalShuffleServicePort = {
100+
val sparkPort = conf.getInt(shuffleServicePortKey, 7337)
101+
if (SparkHadoopUtil.get.isYarnMode) {
102+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
103+
Option(hadoopConf.get(shuffleServicePortKey)).map(_.toInt).getOrElse(sparkPort)
104+
} else {
105+
sparkPort
106+
}
107+
}
108+
96109
// Check that we're not using external shuffle service with consolidated shuffle files.
97110
if (externalShuffleServiceEnabled
98111
&& conf.getBoolean("spark.shuffle.consolidateFiles", false)

network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ protected void serviceInit(Configuration conf) {
4949
RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
5050
TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
5151
transportContext.createServer(port);
52+
logger.info("Started Yarn shuffle service for Spark on port " + port);
5253
} catch (Exception e) {
5354
logger.error("Exception in starting Yarn shuffle service for Spark", e);
5455
}

0 commit comments

Comments
 (0)