Skip to content

[HADOOP-16901] boost ShortCircuit Cache #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: cdh6.3.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class ClientContext {
/**
* Caches short-circuit file descriptors, mmap regions.
*/
private final ShortCircuitCache shortCircuitCache;
private final ShortCircuitCache[] shortCircuitCache;

/**
* Caches TCP and UNIX domain sockets for reuse.
Expand Down Expand Up @@ -114,6 +114,11 @@ public class ClientContext {
* didn't match its config values yet.
*/
private boolean printedConfWarning = false;

/**
* ShorCircuitCache array size.
*/
private final int clientShortCircuitNum;

private NodeBase clientNode;
private boolean topologyResolutionEnabled;
Expand All @@ -124,7 +129,11 @@ private ClientContext(String name, DfsClientConf conf,

this.name = name;
this.confString = scConf.confAsString();
this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
this.clientShortCircuitNum = conf.clientShortCircuitNum;
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
for (int i = 0; i < this.clientShortCircuitNum; i++) {
this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
}
this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
scConf.getSocketCacheExpiry());
this.keyProviderCache = new KeyProviderCache(
Expand Down Expand Up @@ -207,8 +216,8 @@ public String getConfString() {
return confString;
}

public ShortCircuitCache getShortCircuitCache() {
return shortCircuitCache;
public ShortCircuitCache getShortCircuitCache(long idx) {
return shortCircuitCache[(int) (idx % clientShortCircuitNum)];
}

public PeerCache getPeerCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM;

import java.io.DataOutputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -237,7 +239,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final int smallBufferSize;
private final long serverDefaultsValidityPeriod;
private final int clientShortCircuitNum;


public DfsClientConf getConf() {
return dfsClientConf;
}
Expand Down Expand Up @@ -386,6 +390,11 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
this.initThreadsNumForHedgedReads(dfsClientConf.
getHedgedReadThreadpoolSize());
}

this.clientShortCircuitNum = conf.getInt(DFS_CLIENT_SHORT_CIRCUIT_NUM,
DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT);
this.clientShortCircuitNum = this.clientShortCircuitNum > 3 ? 3 : this.clientShortCircuitNum;
this.clientShortCircuitNum = this.clientShortCircuitNum < 1 ? 1 : this.clientShortCircuitNum;

this.initThreadsNumForStripedReads(dfsClientConf.
getStripedReadThreadpoolSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ public interface HdfsClientConfigKeys {
String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY =
PREFIX + "replica.accessor.builder.classes";

String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num";
int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1;

// The number of NN response dropped by client proactively in each RPC call.
// For testing NN retry cache, we can set this property with positive value.
String DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,9 @@ private BlockReader getBlockReaderLocal() throws IOException {
"giving up on BlockReaderLocal.", this, pathInfo);
return null;
}
ShortCircuitCache cache = clientContext.getShortCircuitCache();


ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId());
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
block.getBlockPoolId());
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
Expand Down Expand Up @@ -525,7 +527,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
if (curPeer.fromCache) remainingCacheTries--;
DomainPeer peer = (DomainPeer)curPeer.peer;
Slot slot = null;
ShortCircuitCache cache = clientContext.getShortCircuitCache();
ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId());
try {
MutableBoolean usedPeer = new MutableBoolean(false);
slot = cache.allocShmSlot(datanode, peer, usedPeer,
Expand Down Expand Up @@ -580,7 +582,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
*/
private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
Slot slot) throws IOException {
ShortCircuitCache cache = clientContext.getShortCircuitCache();
ShortCircuitCache cache = clientContext.getShortCircuitCache(slot.getBlockId().getBlockId());
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
SlotId slotId = slot == null ? null : slot.getSlotId();
Expand Down