Skip to content

feat(net): add rate limiting logic for P2P messages #6393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: release_v4.8.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,15 @@ public class CommonParameter {
@Getter
public int rateLimiterGlobalApiQps;
@Getter
@Setter
public double rateLimiterSyncBlockChain;
@Getter
@Setter
public double rateLimiterFetchInvData;
@Getter
@Setter
public double rateLimiterDisconnect;
@Getter
public DbBackupConfig dbBackupConfig;
@Getter
public RocksDbSettings rocksDBCustomSettings;
Expand Down
3 changes: 3 additions & 0 deletions common/src/main/java/org/tron/core/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ public class Constant {

public static final String RATE_LIMITER_HTTP = "rate.limiter.http";
public static final String RATE_LIMITER_RPC = "rate.limiter.rpc";
public static final String RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN = "rate.limiter.p2p.syncBlockChain";
Copy link

@Sunny6889 Sunny6889 Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder should config file to add these new configurations with default values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary, they are not critical configuration items. The rate limit parameter configuration is a precautionary solution, these configuration items may never be used in practice.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

public static final String RATE_LIMITER_P2P_FETCH_INV_DATA = "rate.limiter.p2p.fetchInvData";
public static final String RATE_LIMITER_P2P_DISCONNECT = "rate.limiter.p2p.disconnect";

public static final String SEED_NODE_IP_LIST = "seed.node.ip.list";
public static final String NODE_METRICS_ENABLE = "node.metricsEnable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public enum TypeEnum {
PROTOBUF_ERROR(14, "protobuf inconsistent"),
BLOCK_SIGN_ERROR(15, "block sign error"),
BLOCK_MERKLE_ERROR(16, "block merkle error"),
RATE_LIMIT_EXCEEDED(17, "rate limit exceeded"),

DEFAULT(100, "default exception");

Expand Down
15 changes: 15 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ public static void clearParam() {
PARAMETER.rateLimiterGlobalQps = 50000;
PARAMETER.rateLimiterGlobalIpQps = 10000;
PARAMETER.rateLimiterGlobalApiQps = 1000;
PARAMETER.rateLimiterSyncBlockChain = 3.0;
PARAMETER.rateLimiterFetchInvData = 3.0;
PARAMETER.rateLimiterDisconnect = 1.0;
PARAMETER.p2pDisable = false;
PARAMETER.dynamicConfigEnable = false;
PARAMETER.dynamicConfigCheckInterval = 600;
Expand Down Expand Up @@ -1041,6 +1044,18 @@ public static void setParam(final Config config) {

PARAMETER.rateLimiterInitialization = getRateLimiterFromConfig(config);

PARAMETER.rateLimiterSyncBlockChain =
config.hasPath(Constant.RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN) ? config
.getDouble(Constant.RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN) : 3.0;

PARAMETER.rateLimiterFetchInvData =
config.hasPath(Constant.RATE_LIMITER_P2P_FETCH_INV_DATA) ? config
.getDouble(Constant.RATE_LIMITER_P2P_FETCH_INV_DATA) : 3.0;

PARAMETER.rateLimiterDisconnect =
config.hasPath(Constant.RATE_LIMITER_P2P_DISCONNECT) ? config
.getDouble(Constant.RATE_LIMITER_P2P_DISCONNECT) : 1.0;

PARAMETER.changedDelegation =
config.hasPath(Constant.COMMITTEE_CHANGED_DELEGATION) ? config
.getInt(Constant.COMMITTEE_CHANGED_DELEGATION) : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,11 @@ private void processMessage(PeerConnection peer, byte[] data) {
handshakeService.processHelloMessage(peer, (HelloMessage) msg);
break;
case P2P_DISCONNECT:
peer.getChannel().close();
peer.getNodeStatistics()
.nodeDisconnectedRemote(((DisconnectMessage)msg).getReason());
if (peer.getP2pRateLimiter().tryAcquire(type.asByte())) {
Copy link

@Sunny6889 Sunny6889 Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why disconnect need a rate limiter? As in your issue mentioned "After receiving the message, the connection will be disconnected and no response will be given."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question: is this rate limit set per Peer or shared by all Peers? Will this rate limiter effect the normal disconnect logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question: is this rate limit set per Peer or shared by all Peers? Will this rate limiter effect the normal disconnect logic?

This rate limit is set per peer. This rate limiter does not affect the normal disconnection logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why disconnect need a rate limiter? As in your issue mentioned "After receiving the message, the connection will be disconnected and no response will be given."

To prevent the peer from sending a large number of repeated disconnect messages at once, it will not respond to the message, but will execute the disconnection logic.

peer.getChannel().close();
peer.getNodeStatistics()
.nodeDisconnectedRemote(((DisconnectMessage)msg).getReason());
}
break;
case SYNC_BLOCK_CHAIN:
syncBlockChainMsgHandler.processMessage(peer, msg);
Expand Down Expand Up @@ -259,6 +261,7 @@ private void processException(PeerConnection peer, TronMessage msg, Exception ex
code = Protocol.ReasonCode.NO_SUCH_MESSAGE;
break;
case BAD_MESSAGE:
case RATE_LIMIT_EXCEEDED:
code = Protocol.ReasonCode.BAD_PROTOCOL;
break;
case SYNC_FAILED:
Expand Down
32 changes: 32 additions & 0 deletions framework/src/main/java/org/tron/core/net/P2pRateLimiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.tron.core.net;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.RateLimiter;

public class P2pRateLimiter {
private final Cache<Byte, RateLimiter> rateLimiters = CacheBuilder.newBuilder()
.maximumSize(32).build();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is a hidden size 32 limit, can make it a large one so it normally won't never hit the limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message types defined by the current protocol are very limited, and the limit of 32 is sufficient for the foreseeable future.


public void register(Byte type, double rate) {
RateLimiter rateLimiter = RateLimiter.create(Double.POSITIVE_INFINITY);
rateLimiter.setRate(rate);
rateLimiters.put(type, rateLimiter);
}

public void acquire(Byte type) {
RateLimiter rateLimiter = rateLimiters.getIfPresent(type);
if (rateLimiter == null) {
return;
}
rateLimiter.acquire();
}

public boolean tryAcquire(Byte type) {
RateLimiter rateLimiter = rateLimiters.getIfPresent(type);
if (rateLimiter == null) {
return true;
}
return rateLimiter.tryAcquire();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr
if (!peer.isNeedSyncFromUs()) {
throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync");
}
if (!peer.getP2pRateLimiter().tryAcquire(fetchInvDataMsg.getType().asByte())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumerInvToFetch task is scheduled every 30ms, but the limit FETCH_INV_DATA_RATE is 3 times/s, there will be conflict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rate limit is based on the premise of block synchronization.

throw new P2pException(TypeEnum.RATE_LIMIT_EXCEEDED, fetchInvDataMsg.getType()
+ " message exceeds the rate limit");
}
if (fetchInvDataMsg.getHashList().size() > NetConstants.MAX_BLOCK_FETCH_PER_PEER) {
throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too more blocks, size:"
+ fetchInvDataMsg.getHashList().size());
}
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
long blockNum = new BlockId(hash).getNum();
long minBlockNum =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
}

private boolean check(PeerConnection peer, SyncBlockChainMessage msg) throws P2pException {
if (peer.getRemainNum() > 0
&& !peer.getP2pRateLimiter().tryAcquire(msg.getType().asByte())) {
// Discard messages that exceed the rate limit
logger.warn("{} message from peer {} exceeds the rate limit",
msg.getType(), peer.getInetSocketAddress());
return false;
}

List<BlockId> blockIds = msg.getBlockIds();
if (CollectionUtils.isEmpty(blockIds)) {
throw new P2pException(TypeEnum.BAD_MESSAGE, "SyncBlockChain blockIds is empty");
Expand Down
13 changes: 13 additions & 0 deletions framework/src/main/java/org/tron/core/net/peer/PeerConnection.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.tron.core.net.peer;

import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA;
import static org.tron.core.net.message.MessageTypes.P2P_DISCONNECT;
import static org.tron.core.net.message.MessageTypes.SYNC_BLOCK_CHAIN;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -32,6 +36,7 @@
import org.tron.core.config.args.Args;
import org.tron.core.metrics.MetricsKey;
import org.tron.core.metrics.MetricsUtil;
import org.tron.core.net.P2pRateLimiter;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.adv.InventoryMessage;
import org.tron.core.net.message.adv.TransactionsMessage;
Expand Down Expand Up @@ -156,6 +161,8 @@ public class PeerConnection {
@Setter
@Getter
private volatile boolean needSyncFromUs = true;
@Getter
private P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();

public void setChannel(Channel channel) {
this.channel = channel;
Expand All @@ -164,6 +171,12 @@ public void setChannel(Channel channel) {
}
this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress());
lastInteractiveTime = System.currentTimeMillis();
p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(),
Args.getInstance().getRateLimiterSyncBlockChain());
p2pRateLimiter.register(FETCH_INV_DATA.asByte(),
Args.getInstance().getRateLimiterFetchInvData());
p2pRateLimiter.register(P2P_DISCONNECT.asByte(),
Args.getInstance().getRateLimiterDisconnect());
}

public void setBlockBothHave(BlockId blockId) {
Expand Down
23 changes: 23 additions & 0 deletions framework/src/test/java/org/tron/core/net/P2pRateLimiterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.tron.core.net;

import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA;
import static org.tron.core.net.message.MessageTypes.SYNC_BLOCK_CHAIN;

import org.junit.Assert;
import org.junit.Test;

public class P2pRateLimiterTest {
@Test
public void test() {
P2pRateLimiter limiter = new P2pRateLimiter();
limiter.register(SYNC_BLOCK_CHAIN.asByte(), 2);
limiter.acquire(SYNC_BLOCK_CHAIN.asByte());
boolean ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte());
Assert.assertTrue(ret);
limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte());
ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte());
Assert.assertFalse(ret);
ret = limiter.tryAcquire(FETCH_INV_DATA.asByte());
Assert.assertTrue(ret);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.tron.core.net.messagehandler;

import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.lang.reflect.Field;
Expand All @@ -13,6 +15,7 @@
import org.tron.common.utils.Sha256Hash;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.config.Parameter;
import org.tron.core.net.P2pRateLimiter;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.adv.BlockMessage;
import org.tron.core.net.message.adv.FetchInvDataMessage;
Expand Down Expand Up @@ -55,6 +58,9 @@ public void testProcessMessage() throws Exception {
Mockito.when(advService.getMessage(new Item(blockId, Protocol.Inventory.InventoryType.BLOCK)))
.thenReturn(new BlockMessage(blockCapsule));
ReflectUtils.setFieldValue(fetchInvDataMsgHandler, "advService", advService);
P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();
p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2);
Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter);

fetchInvDataMsgHandler.processMessage(peer,
new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK));
Expand All @@ -74,6 +80,9 @@ public void testSyncFetchCheck() {
Cache<Item, Long> advInvSpread = CacheBuilder.newBuilder().maximumSize(100)
.expireAfterWrite(1, TimeUnit.HOURS).recordStats().build();
Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread);
P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();
p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2);
Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter);

FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler();

Expand All @@ -93,4 +102,36 @@ public void testSyncFetchCheck() {
Assert.assertEquals(e.getMessage(), "minBlockNum: 16000, blockNum: 10000");
}
}

@Test
public void testRateLimiter() {
BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000L);
List<Sha256Hash> blockIds = new LinkedList<>();
for (int i = 0; i <= 100; i++) {
blockIds.add(blockId);
}
FetchInvDataMessage msg =
new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK);
PeerConnection peer = Mockito.mock(PeerConnection.class);
Mockito.when(peer.isNeedSyncFromUs()).thenReturn(true);
Cache<Item, Long> advInvSpread = CacheBuilder.newBuilder().maximumSize(100)
.expireAfterWrite(1, TimeUnit.HOURS).recordStats().build();
Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread);
P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();
p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 1);
p2pRateLimiter.acquire(FETCH_INV_DATA.asByte());
Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter);
FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler();

try {
fetchInvDataMsgHandler.processMessage(peer, msg);
} catch (Exception e) {
Assert.assertEquals("fetch too more blocks, size:101", e.getMessage());
}
try {
fetchInvDataMsgHandler.processMessage(peer, msg);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().endsWith("rate limit"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void init() throws Exception {
@Test
public void testProcessMessage() throws Exception {
try {
peer.setRemainNum(1);
handler.processMessage(peer, new SyncBlockChainMessage(new ArrayList<>()));
} catch (P2pException e) {
Assert.assertEquals("SyncBlockChain blockIds is empty", e.getMessage());
Expand All @@ -71,6 +72,10 @@ public void testProcessMessage() throws Exception {
Assert.assertNotNull(message.toString());
Assert.assertNotNull(((BlockInventoryMessage) message).getAnswerMessage());
Assert.assertFalse(f);
method.invoke(handler, peer, message);
method.invoke(handler, peer, message);
f = (boolean)method.invoke(handler, peer, message);
Assert.assertFalse(f);

Method method1 = handler.getClass().getDeclaredMethod(
"getLostBlockIds", List.class, BlockId.class);
Expand Down