Skip to content

Commit 7370eab

Browse files
committed
Config updates for the new shuffle transport.
1 parent 2b9b726 commit 7370eab

File tree

3 files changed

+6
-6
lines changed

3 files changed

+6
-6
lines changed

network/common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ public boolean preferDirectBufs() {
3535
return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true);
3636
}
3737

38-
/** Connect timeout in secs. Default 120 secs. */
38+
/** Connect timeout in milliseconds. Default 120 secs. */
3939
public int connectionTimeoutMs() {
4040
return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000;
4141
}
4242

4343
/** Number of concurrent connections between two nodes for fetching data. **/
4444
public int numConnectionsPerPeer() {
45-
return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 2);
45+
return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 1);
4646
}
4747

4848
/** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */
@@ -67,7 +67,7 @@ public int numConnectionsPerPeer() {
6767
public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); }
6868

6969
/** Timeout for a single round trip of SASL token exchange, in milliseconds. */
70-
public int saslRTTimeout() { return conf.getInt("spark.shuffle.sasl.timeout", 30000); }
70+
public int saslRTTimeoutMs() { return conf.getInt("spark.shuffle.sasl.timeout", 30) * 1000; }
7171

7272
/**
7373
* Max number of times we will try IO exceptions (such as connection timeouts) per request.
@@ -79,7 +79,7 @@ public int numConnectionsPerPeer() {
7979
* Time (in milliseconds) that we will wait in order to perform a retry after an IOException.
8080
* Only relevant if maxIORetries > 0.
8181
*/
82-
public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); }
82+
public int ioRetryWaitTimeMs() { return conf.getInt("spark.shuffle.io.retryWait", 5) * 1000; }
8383

8484
/**
8585
* Minimum size of a block that we should start using memory map rather than reading in through

network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void doBootstrap(TransportClient client) {
5959
ByteBuf buf = Unpooled.buffer(msg.encodedLength());
6060
msg.encode(buf);
6161

62-
byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeout());
62+
byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeoutMs());
6363
payload = saslClient.response(response);
6464
}
6565
} finally {

network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public RetryingBlockFetcher(
106106
this.fetchStarter = fetchStarter;
107107
this.listener = listener;
108108
this.maxRetries = conf.maxIORetries();
109-
this.retryWaitTime = conf.ioRetryWaitTime();
109+
this.retryWaitTime = conf.ioRetryWaitTimeMs();
110110
this.outstandingBlocksIds = Sets.newLinkedHashSet();
111111
Collections.addAll(outstandingBlocksIds, blockIds);
112112
this.currentListener = new RetryingBlockFetchListener();

0 commit comments

Comments
 (0)