diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index ad1b3595cbd5..5722de27bda4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -76,7 +76,7 @@ public class ClientContext { /** * Caches short-circuit file descriptors, mmap regions. */ - private final ShortCircuitCache shortCircuitCache; + private final ShortCircuitCache[] shortCircuitCache; /** * Caches TCP and UNIX domain sockets for reuse. @@ -114,6 +114,11 @@ public class ClientContext { * didn't match its config values yet. */ private boolean printedConfWarning = false; + + /** + * ShorCircuitCache array size. + */ + private final int clientShortCircuitNum; private NodeBase clientNode; private boolean topologyResolutionEnabled; @@ -124,7 +129,11 @@ private ClientContext(String name, DfsClientConf conf, this.name = name; this.confString = scConf.confAsString(); - this.shortCircuitCache = ShortCircuitCache.fromConf(scConf); + this.clientShortCircuitNum = conf.clientShortCircuitNum; + this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum]; + for (int i = 0; i < this.clientShortCircuitNum; i++) { + this.shortCircuitCache = ShortCircuitCache.fromConf(scConf); + } this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), scConf.getSocketCacheExpiry()); this.keyProviderCache = new KeyProviderCache( @@ -207,8 +216,8 @@ public String getConfString() { return confString; } - public ShortCircuitCache getShortCircuitCache() { - return shortCircuitCache; + public ShortCircuitCache getShortCircuitCache(long idx) { + return shortCircuitCache[(int) (idx % clientShortCircuitNum)]; } public PeerCache getPeerCache() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index fc98c6527a29..352e82cff7bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -27,6 +27,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM; import java.io.DataOutputStream; import java.io.FileNotFoundException; @@ -237,7 +239,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL; private final int smallBufferSize; private final long serverDefaultsValidityPeriod; + private final int clientShortCircuitNum; + public DfsClientConf getConf() { return dfsClientConf; } @@ -386,6 +390,11 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, this.initThreadsNumForHedgedReads(dfsClientConf. getHedgedReadThreadpoolSize()); } + + this.clientShortCircuitNum = conf.getInt(DFS_CLIENT_SHORT_CIRCUIT_NUM, + DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT); + this.clientShortCircuitNum = this.clientShortCircuitNum > 3 ? 3 : this.clientShortCircuitNum; + this.clientShortCircuitNum = this.clientShortCircuitNum < 1 ? 1 : this.clientShortCircuitNum; this.initThreadsNumForStripedReads(dfsClientConf. getStripedReadThreadpoolSize()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index e3626d2b8ee4..3b99685b5765 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -167,6 +167,9 @@ public interface HdfsClientConfigKeys { String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY = PREFIX + "replica.accessor.builder.classes"; + String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num"; + int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1; + // The number of NN response dropped by client proactively in each RPC call. // For testing NN retry cache, we can set this property with positive value. String DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY = diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java index bcb64feba310..4b76ca43bb3b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java @@ -474,7 +474,9 @@ private BlockReader getBlockReaderLocal() throws IOException { "giving up on BlockReaderLocal.", this, pathInfo); return null; } - ShortCircuitCache cache = clientContext.getShortCircuitCache(); + + + ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId()); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); @@ -525,7 +527,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; Slot slot = null; - ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId()); try { MutableBoolean usedPeer = new MutableBoolean(false); slot = cache.allocShmSlot(datanode, peer, usedPeer, @@ -580,7 +582,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { */ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, Slot slot) throws IOException { - ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ShortCircuitCache cache = clientContext.getShortCircuitCache(slot.getBlockId().getBlockId()); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); SlotId slotId = slot == null ? null : slot.getSlotId();