Skip to content

feat(net): add logs and adjust disconnection strategy #5944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 14, 2024
3 changes: 3 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,9 @@ public static void setParam(final String[] args, final String confFileName) {

PARAMETER.inactiveThreshold = config.hasPath(Constant.NODE_INACTIVE_THRESHOLD)
? config.getInt(Constant.NODE_INACTIVE_THRESHOLD) : 600;
if (PARAMETER.inactiveThreshold < 1) {
PARAMETER.inactiveThreshold = 1;
}

PARAMETER.maxTransactionPendingSize = config.hasPath(Constant.NODE_MAX_TRANSACTION_PENDING_SIZE)
? config.getInt(Constant.NODE_MAX_TRANSACTION_PENDING_SIZE) : 2000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public String log() {
+ "syncBlockRequestedSize:%d\n"
+ "remainNum:%d\n"
+ "syncChainRequested:%d\n"
+ "inactiveSeconds:%d\n"
+ "blockInProcess:%d\n",
channel.getInetSocketAddress(),
(now - channel.getStartTime()) / Constant.ONE_THOUSAND,
Expand All @@ -244,6 +245,7 @@ public String log() {
remainNum,
requested == null ? 0 : (now - requested.getValue())
/ Constant.ONE_THOUSAND,
(now - lastActiveTime) / Constant.ONE_THOUSAND,
syncBlockInProcess.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ResilienceService {

private static final long inactiveThreshold =
CommonParameter.getInstance().getInactiveThreshold() * 1000L;
public static final long blockNotChangeThreshold = 90 * 1000L;
public static final long blockNotChangeThreshold = 60 * 1000L;

//when node is isolated, retention percent peers will not be disconnected
public static final double retentionPercent = 0.8;
Expand Down Expand Up @@ -74,75 +74,81 @@ private void disconnectRandom() {
if (peerSize >= CommonParameter.getInstance().getMaxConnections()) {
long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold)
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
.collect(Collectors.toList());
if (!peers.isEmpty()) {
int index = new Random().nextInt(peers.size());
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION);
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION,
DisconnectCause.RANDOM_ELIMINATION);
}
}
}

private void disconnectLan() {
if (isLanNode()) {
// disconnect from the node that has keep inactive for more than inactiveThreshold
// and its lastActiveTime is smallest
int peerSize = tronNetDelegate.getActivePeer().size();
if (peerSize >= CommonParameter.getInstance().getMinConnections()) {
long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold)
.filter(peer -> !peer.getChannel().isTrustPeer())
.collect(Collectors.toList());
Optional<PeerConnection> one = getEarliestPeer(peers);
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL));
}
if (!isLanNode()) {
return;
}
// disconnect from the node that has keep inactive for more than inactiveThreshold
// and its lastActiveTime is smallest
int peerSize = tronNetDelegate.getActivePeer().size();
if (peerSize >= CommonParameter.getInstance().getMinConnections()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this conditional statement redundant?

Copy link
Contributor Author

@317787106 317787106 Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not redundant, condition of lan is different: peerSize > CommonParameter.getInstance().getMinActiveConnections().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it '>', shouldn't lan nodes be '=' in most scenarios?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ">=", not ">".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm talking about why the comparison in the isLanNode method is '>'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I will revise it.

Copy link
Contributor

@fyyhtx fyyhtx Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let's go back to the original question. How do you think about it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even it's a lan node, it will disconnect from some nodes only if peerSize >= CommonParameter.getInstance().getMinConnections(). Later condition is more strict. It's not redundant.

long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold)
.filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
.filter(peer -> !peer.getChannel().isTrustPeer())
.collect(Collectors.toList());
Optional<PeerConnection> one = getEarliestPeer(peers);
one.ifPresent(
peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL, DisconnectCause.LAN_NODE));
}
}

private void disconnectIsolated2() {
if (isIsolateLand2()) {
logger.info("Node is isolated, try to disconnect from peers");
int peerSize = tronNetDelegate.getActivePeer().size();

//disconnect from the node whose lastActiveTime is smallest
if (peerSize >= CommonParameter.getInstance().getMinActiveConnections()) {
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> peer.getChannel().isActive())
.collect(Collectors.toList());

Optional<PeerConnection> one = getEarliestPeer(peers);
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL));
}
if (!isIsolateLand2()) {
return;
}
logger.warn("Node is isolated, try to disconnect from peers");
int peerSize = tronNetDelegate.getActivePeer().size();

//disconnect from some passive nodes, make sure retention nodes' num <= 0.8 * maxConnection,
//so new peers can come in
peerSize = tronNetDelegate.getActivePeer().size();
int threshold = (int) (CommonParameter.getInstance().getMaxConnections() * retentionPercent);
if (peerSize > threshold) {
int disconnectSize = peerSize - threshold;
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> !peer.getChannel().isActive())
.collect(Collectors.toList());
try {
peers.sort(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo));
} catch (Exception e) {
logger.warn("Sort disconnectIsolated2 peers failed: {}", e.getMessage());
return;
}
//disconnect from the node whose lastActiveTime is smallest
if (peerSize >= CommonParameter.getInstance().getMinActiveConnections()) {
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> peer.getChannel().isActive())
.collect(Collectors.toList());

if (peers.size() > disconnectSize) {
peers = peers.subList(0, disconnectSize);
}
peers.forEach(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL));
Optional<PeerConnection> one = getEarliestPeer(peers);
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL,
DisconnectCause.ISOLATE2_ACTIVE));
}

//disconnect from some passive nodes, make sure retention nodes' num <= 0.8 * maxConnection,
//so new peers can come in
peerSize = tronNetDelegate.getActivePeer().size();
int threshold = (int) (CommonParameter.getInstance().getMaxConnections() * retentionPercent);
if (peerSize > threshold) {
int disconnectSize = peerSize - threshold;
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> !peer.getChannel().isActive())
.collect(Collectors.toList());
try {
peers.sort(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo));
} catch (Exception e) {
logger.warn("Sort disconnectIsolated2 peers failed: {}", e.getMessage());
return;
}
int candidateSize = peers.size();
if (peers.size() > disconnectSize) {
peers = peers.subList(0, disconnectSize);
}
logger.info("All peer Size:{}, plan size:{}, candidate size:{}, real size:{}", peerSize,
disconnectSize, candidateSize, peers.size());
peers.forEach(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL,
DisconnectCause.ISOLATE2_PASSIVE));
}
}

Expand All @@ -162,7 +168,8 @@ private boolean isLanNode() {
int activePeerSize = (int) tronNetDelegate.getActivePeer().stream()
.filter(peer -> peer.getChannel().isActive())
.count();
return peerSize > 0 && peerSize == activePeerSize;
return peerSize >= CommonParameter.getInstance().getMinActiveConnections()
&& peerSize == activePeerSize;
}

private boolean isIsolateLand2() {
Expand All @@ -173,13 +180,21 @@ private boolean isIsolateLand2() {
return advPeerCount >= 1 && diff >= blockNotChangeThreshold;
}

private void disconnectFromPeer(PeerConnection peer, ReasonCode reasonCode) {
private void disconnectFromPeer(PeerConnection peer, ReasonCode reasonCode,
DisconnectCause cause) {
int inactiveSeconds = (int) ((System.currentTimeMillis() - peer.getLastActiveTime()) / 1000);
logger.info("Disconnect from peer {}, inactive seconds {}", peer.getInetSocketAddress(),
inactiveSeconds);
logger.info("Disconnect from peer {}, inactive seconds {}, cause: {}",
peer.getInetSocketAddress(), inactiveSeconds, cause);
peer.disconnect(reasonCode);
}

private enum DisconnectCause {
RANDOM_ELIMINATION,
LAN_NODE,
ISOLATE2_ACTIVE,
ISOLATE2_PASSIVE,
}

public void close() {
ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public void testDisconnectRandom() {

PeerManager.add(context, c1);
}
for (PeerConnection peer : PeerManager.getPeers()) {
peer.setNeedSyncFromPeer(false);
peer.setNeedSyncFromUs(false);
}
ReflectUtils.invokeMethod(service, "disconnectRandom");
Assert.assertEquals(maxConnection, PeerManager.getPeers().size());

Expand Down Expand Up @@ -93,7 +97,10 @@ public void testDisconnectLan() {

PeerManager.add(context, c1);
}

for (PeerConnection peer : PeerManager.getPeers()) {
peer.setNeedSyncFromPeer(false);
peer.setNeedSyncFromUs(false);
}
Assert.assertEquals(9, PeerManager.getPeers().size());

boolean isLan = ReflectUtils.invokeMethod(service, "isLanNode");
Expand Down