Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,7 @@ public interface ISyncConfig : IConfig

[ConfigItem(Description = "_Technical._ Estimated max size of blocks in block processing queue before stop downloading.", DefaultValue = "200000000", HiddenFromDocs = true)]
long ForwardSyncBlockProcessingQueueMemoryBudget { get; set; }

[ConfigItem(Description = "The maximum number of concurrent allocations per peer per allocation context. Allows a peer to serve multiple requests simultaneously.", DefaultValue = "2")]
int MaxAllocationsPerPeerPerContext { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public string? PivotHash
public bool EnableSnapSyncStorageRangeSplit { get; set; } = false;
public long ForwardSyncDownloadBufferMemoryBudget { get; set; } = 200.MiB();
public long ForwardSyncBlockProcessingQueueMemoryBudget { get; set; } = 200.MiB();
public int MaxAllocationsPerPeerPerContext { get; set; } = 2;

public override string ToString()
{
Expand Down
10 changes: 8 additions & 2 deletions src/Nethermind/Nethermind.Synchronization.Test/PeerInfoTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public void Can_allocate()
peerInfo.IsAllocated(_contexts).Should().BeFalse();
peerInfo.TryAllocate(_contexts);
peerInfo.IsAllocated(_contexts).Should().BeTrue();
// With default maxAllocationsPerContext = 2, we can still allocate again
peerInfo.CanBeAllocated(_contexts).Should().BeTrue();
// Allocate a second time
peerInfo.TryAllocate(_contexts).Should().BeTrue();
// Now we've hit the limit
peerInfo.CanBeAllocated(_contexts).Should().BeFalse();
}

Expand All @@ -123,9 +128,10 @@ public void Cannot_allocate_subcontext()
peerInfo.IsAllocated(AllocationContexts.Bodies).Should().BeTrue();
peerInfo.IsAllocated(AllocationContexts.Headers).Should().BeFalse();
peerInfo.IsAllocated(AllocationContexts.Receipts).Should().BeTrue();
peerInfo.CanBeAllocated(AllocationContexts.Bodies).Should().BeFalse();
// With multiple allocations per context, we can still allocate again (up to the limit)
peerInfo.CanBeAllocated(AllocationContexts.Bodies).Should().BeTrue();
peerInfo.CanBeAllocated(AllocationContexts.Headers).Should().BeTrue();
peerInfo.CanBeAllocated(AllocationContexts.Receipts).Should().BeFalse();
peerInfo.CanBeAllocated(AllocationContexts.Receipts).Should().BeTrue();

peerInfo.Free(AllocationContexts.Receipts);
peerInfo.IsAllocated(AllocationContexts.Receipts).Should().BeFalse();
Expand Down
116 changes: 109 additions & 7 deletions src/Nethermind/Nethermind.Synchronization.Test/SyncPeerPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -417,20 +417,31 @@ public async Task Can_borrow_many()

SyncPeerAllocation allocation1 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
SyncPeerAllocation allocation2 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
Assert.That(allocation2.Current, Is.Not.SameAs(allocation1.Current), "first");
SyncPeerAllocation allocation3 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
SyncPeerAllocation allocation4 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
// With 2 peers and max 2 allocations per peer, all 4 allocations should succeed
Assert.That(allocation1.Current, Is.Not.Null, "first A");
Assert.That(allocation2.Current, Is.Not.Null, "first B");
Assert.That(allocation3.Current, Is.Not.Null, "first C");
Assert.That(allocation4.Current, Is.Not.Null, "first D");

ctx.Pool.Free(allocation1);
ctx.Pool.Free(allocation2);
ctx.Pool.Free(allocation3);
ctx.Pool.Free(allocation4);
Assert.That(allocation1.Current, Is.Null, "null A");
Assert.That(allocation2.Current, Is.Null, "null B");
Assert.That(allocation3.Current, Is.Null, "null C");
Assert.That(allocation4.Current, Is.Null, "null D");

allocation1 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
allocation2 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
Assert.That(allocation2.Current, Is.Not.SameAs(allocation1.Current));
allocation3 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
allocation4 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
Assert.That(allocation1.Current, Is.Not.Null, "second A");
Assert.That(allocation2.Current, Is.Not.Null, "second B");
Assert.That(allocation3.Current, Is.Not.Null, "second C");
Assert.That(allocation4.Current, Is.Not.Null, "second D");
}

[Test]
Expand All @@ -443,13 +454,26 @@ public async Task Does_not_allocate_sleeping_peers()
ctx.Pool.ReportNoSyncProgress(ctx.Pool.InitializedPeers.First(), AllocationContexts.All);
}

// With 1 sleeping peer and 2 awake peers, max 2 allocations per peer = 4 total allocations possible
SyncPeerAllocation allocation1 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
SyncPeerAllocation allocation2 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
SyncPeerAllocation allocation3 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
SyncPeerAllocation allocation4 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
// Fifth allocation should fail as we've maxed out the 2 awake peers (2 slots each)
SyncPeerAllocation allocation5 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true), AllocationContexts.Headers, 50);

Assert.That(allocation1.HasPeer, Is.True);
Assert.That(allocation2.HasPeer, Is.True);
Assert.That(allocation3.HasPeer, Is.False);
Assert.That(allocation3.HasPeer, Is.True);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot a sleeping peer cannot allocate even if it has slot.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a1c3297. Updated test to allocate 4 times (maxing out the 2 awake peers) and then verify the 5th allocation fails, confirming sleeping peers cannot be allocated even with available slots.

Assert.That(allocation4.HasPeer, Is.True);
Assert.That(allocation5.HasPeer, Is.False, "Fifth allocation should fail as all slots on awake peers are full");

// Verify none of the allocations used the sleeping peer
PeerInfo sleepingPeer = ctx.Pool.InitializedPeers.First();
Assert.That(allocation1.Current, Is.Not.SameAs(sleepingPeer));
Assert.That(allocation2.Current, Is.Not.SameAs(sleepingPeer));
Assert.That(allocation3.Current, Is.Not.SameAs(sleepingPeer));
Assert.That(allocation4.Current, Is.Not.SameAs(sleepingPeer));
}

[Test]
Expand Down Expand Up @@ -491,18 +515,96 @@ public async Task Report_invalid_invokes_disconnection()
}

[Test]
public async Task Will_not_allocate_same_peer_to_two_allocations()
public async Task Will_not_allocate_same_peer_to_two_allocations_when_max_is_one()
{
await using Context ctx = new();
// Set maxAllocationsPerPeerPerContext to 1 to restore old behavior
ctx.Pool = new SyncPeerPool(ctx.BlockTree, ctx.Stats, ctx.PeerStrategy, LimboLogs.Instance, 25, 50, SyncPeerPool.DefaultUpgradeIntervalInMs, 1);
SimpleSyncPeerMock[] peers = await SetupPeers(ctx, 1);

SyncPeerAllocation allocation1 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
SyncPeerAllocation allocation2 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
SyncPeerAllocation allocation2 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true), AllocationContexts.Headers, 50);

Assert.That(allocation1.Current?.SyncPeer, Is.SameAs(peers[0]));
Assert.That(allocation2.Current, Is.Null);
}

[Test]
public async Task Can_allocate_same_peer_multiple_times_with_default_config()
{
await using Context ctx = new();
SimpleSyncPeerMock[] peers = await SetupPeers(ctx, 1);

// With default config (maxAllocationsPerPeerPerContext = 2), we should be able to allocate the same peer twice
SyncPeerAllocation allocation1 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
SyncPeerAllocation allocation2 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));

Assert.That(allocation1.Current?.SyncPeer, Is.SameAs(peers[0]), "First allocation should succeed");
Assert.That(allocation2.Current?.SyncPeer, Is.SameAs(peers[0]), "Second allocation should also succeed with same peer");
// Both allocations point to the same PeerInfo
Assert.That(allocation1.Current, Is.SameAs(allocation2.Current), "Both allocations should point to same PeerInfo");
}

[Test]
public async Task Cannot_allocate_same_peer_beyond_max_limit()
{
await using Context ctx = new();
SimpleSyncPeerMock[] peers = await SetupPeers(ctx, 1);

// Allocate the peer twice (reaches the limit of 2)
SyncPeerAllocation allocation1 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
SyncPeerAllocation allocation2 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));

// Third allocation should fail as we've reached the limit
SyncPeerAllocation allocation3 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true), AllocationContexts.Headers, 50);

Assert.That(allocation1.Current?.SyncPeer, Is.SameAs(peers[0]), "First allocation should succeed");
Assert.That(allocation2.Current?.SyncPeer, Is.SameAs(peers[0]), "Second allocation should succeed");
Assert.That(allocation3.Current, Is.Null, "Third allocation should fail");
}

[Test]
public async Task Can_allocate_again_after_freeing()
{
await using Context ctx = new();
SimpleSyncPeerMock[] peers = await SetupPeers(ctx, 1);

// Allocate the peer twice (reaches the limit of 2)
SyncPeerAllocation allocation1 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
SyncPeerAllocation allocation2 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));

// Third allocation should fail
SyncPeerAllocation allocation3 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true), AllocationContexts.Headers, 50);
Assert.That(allocation3.Current, Is.Null, "Third allocation should fail before freeing");

// Free one allocation
ctx.Pool.Free(allocation1);

// Now we should be able to allocate again
SyncPeerAllocation allocation4 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
Assert.That(allocation4.Current?.SyncPeer, Is.SameAs(peers[0]), "Fourth allocation should succeed after freeing one");
}

[Test]
public async Task Multiple_allocations_work_with_different_contexts()
{
await using Context ctx = new();
SimpleSyncPeerMock[] peers = await SetupPeers(ctx, 1);

// Allocate for Headers context twice
SyncPeerAllocation allocation1 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true), AllocationContexts.Headers);
SyncPeerAllocation allocation2 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true), AllocationContexts.Headers);

// Allocate for Bodies context twice
SyncPeerAllocation allocation3 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Bodies, true), AllocationContexts.Bodies);
SyncPeerAllocation allocation4 = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Bodies, true), AllocationContexts.Bodies);

Assert.That(allocation1.Current?.SyncPeer, Is.SameAs(peers[0]), "First Headers allocation should succeed");
Assert.That(allocation2.Current?.SyncPeer, Is.SameAs(peers[0]), "Second Headers allocation should succeed");
Assert.That(allocation3.Current?.SyncPeer, Is.SameAs(peers[0]), "First Bodies allocation should succeed");
Assert.That(allocation4.Current?.SyncPeer, Is.SameAs(peers[0]), "Second Bodies allocation should succeed");
}

[Test]
public async Task Will_remove_peer_if_times_out_on_init()
{
Expand Down Expand Up @@ -572,8 +674,8 @@ public async Task Can_borrow_async_many()
SyncPeerAllocation[] allocations = allocationTasks.Select(static t => t.Result).ToArray();
SyncPeerAllocation[] successfulAllocations = allocations.Where(static r => r.Current is not null).ToArray();

// we had only two peers and 3 borrow calls so only two are successful
Assert.That(successfulAllocations.Length, Is.EqualTo(2));
// With 2 peers and max 2 allocations per peer, all 3 borrow calls can succeed
Assert.That(successfulAllocations.Length, Is.EqualTo(3));

foreach (SyncPeerAllocation allocation in successfulAllocations)
{
Expand Down
63 changes: 55 additions & 8 deletions src/Nethermind/Nethermind.Synchronization/Peers/PeerInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,18 @@ namespace Nethermind.Synchronization.Peers
{
public class PeerInfo
{
private int _maxAllocationsPerContext = 2;

public PeerInfo(ISyncPeer syncPeer)
{
SyncPeer = syncPeer;
}

public void SetMaxAllocationsPerContext(int max)
{
_maxAllocationsPerContext = max;
}

public NodeClientType PeerClientType => SyncPeer?.ClientType ?? NodeClientType.Unknown;

public AllocationContexts AllocatedContexts { get; private set; }
Expand All @@ -33,6 +40,8 @@ public PeerInfo(ISyncPeer syncPeer)

private ConcurrentDictionary<AllocationContexts, DateTime?> SleepingSince { get; } = new();

private readonly ConcurrentDictionary<AllocationContexts, int> _allocationCounts = new();

public ISyncPeer SyncPeer { get; }

public bool IsInitialized => SyncPeer.IsInitialized;
Expand Down Expand Up @@ -63,9 +72,25 @@ public bool ShouldNotifyNewRange(long earliestNumber, long latestNumber)
[MethodImpl(MethodImplOptions.Synchronized)]
public bool CanBeAllocated(AllocationContexts contexts)
{
return !IsAsleep(contexts) &&
!IsAllocated(contexts) &&
this.SupportsAllocation(contexts);
if (IsAsleep(contexts) || !this.SupportsAllocation(contexts))
{
return false;
}

// Check each single context flag
foreach (KeyValuePair<AllocationContexts, int> allocationIndex in AllocationIndexes)
{
if ((contexts & allocationIndex.Key) == allocationIndex.Key)
{
int currentCount = _allocationCounts.GetOrAdd(allocationIndex.Key, 0);
if (currentCount >= _maxAllocationsPerContext)
{
return false;
}
}
}

return true;
}

[MethodImpl(MethodImplOptions.Synchronized)]
Expand All @@ -83,19 +108,41 @@ public bool IsAllocated(AllocationContexts contexts)
[MethodImpl(MethodImplOptions.Synchronized)]
public bool TryAllocate(AllocationContexts contexts)
{
if (CanBeAllocated(contexts))
if (!CanBeAllocated(contexts))
{
AllocatedContexts |= contexts;
return true;
return false;
}

return false;
// Increment allocation count for each context
foreach (KeyValuePair<AllocationContexts, int> allocationIndex in AllocationIndexes)
{
if ((contexts & allocationIndex.Key) == allocationIndex.Key)
{
_allocationCounts.AddOrUpdate(allocationIndex.Key, 1, (_, count) => count + 1);
}
}

AllocatedContexts |= contexts;
return true;
}

[MethodImpl(MethodImplOptions.Synchronized)]
public void Free(AllocationContexts contexts)
{
AllocatedContexts ^= contexts;
// Decrement allocation count for each context
foreach (KeyValuePair<AllocationContexts, int> allocationIndex in AllocationIndexes)
{
if ((contexts & allocationIndex.Key) == allocationIndex.Key)
{
_allocationCounts.AddOrUpdate(allocationIndex.Key, 0, (_, count) => Math.Max(0, count - 1));

// If count reaches 0, clear the allocated flag for this context
if (_allocationCounts.GetOrAdd(allocationIndex.Key, 0) == 0)
{
AllocatedContexts &= ~allocationIndex.Key;
}
}
}
}

[MethodImpl(MethodImplOptions.Synchronized)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class SyncPeerPool : ISyncPeerPool, IPeerDifficultyRefreshPool
private readonly INodeStatsManager _stats;
private readonly IBetterPeerStrategy _betterPeerStrategy;
private readonly int _allocationsUpgradeIntervalInMs;
private readonly int _maxAllocationsPerPeerPerContext;

private bool _isStarted;
private readonly Lock _isAllocatedChecks = new();
Expand All @@ -65,8 +66,9 @@ public SyncPeerPool(IBlockTree blockTree,
INodeStatsManager nodeStatsManager,
IBetterPeerStrategy betterPeerStrategy,
INetworkConfig networkConfig,
ISyncConfig syncConfig,
ILogManager logManager)
: this(blockTree, nodeStatsManager, betterPeerStrategy, logManager, networkConfig.ActivePeersMaxCount, networkConfig.PriorityPeersMaxCount)
: this(blockTree, nodeStatsManager, betterPeerStrategy, logManager, networkConfig.ActivePeersMaxCount, networkConfig.PriorityPeersMaxCount, DefaultUpgradeIntervalInMs, syncConfig.MaxAllocationsPerPeerPerContext)
{

}
Expand All @@ -77,14 +79,16 @@ public SyncPeerPool(IBlockTree blockTree,
ILogManager logManager,
int peersMaxCount = 100,
int priorityPeerMaxCount = 0,
int allocationsUpgradeIntervalInMsInMs = DefaultUpgradeIntervalInMs)
int allocationsUpgradeIntervalInMsInMs = DefaultUpgradeIntervalInMs,
int maxAllocationsPerPeerPerContext = 2)
{
_blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree));
_stats = nodeStatsManager ?? throw new ArgumentNullException(nameof(nodeStatsManager));
_betterPeerStrategy = betterPeerStrategy ?? throw new ArgumentNullException(nameof(betterPeerStrategy));
PeerMaxCount = peersMaxCount;
PriorityPeerMaxCount = priorityPeerMaxCount;
_allocationsUpgradeIntervalInMs = allocationsUpgradeIntervalInMsInMs;
_maxAllocationsPerPeerPerContext = maxAllocationsPerPeerPerContext;
_logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));

if (_logger.IsDebug) _logger.Debug($"PeerMaxCount: {PeerMaxCount}, PriorityPeerMaxCount: {PriorityPeerMaxCount}");
Expand Down Expand Up @@ -241,6 +245,7 @@ public void AddPeer(ISyncPeer syncPeer)
}

PeerInfo peerInfo = new(syncPeer);
peerInfo.SetMaxAllocationsPerContext(_maxAllocationsPerPeerPerContext);
_peers.TryAdd(syncPeer.Node.Id, peerInfo);
UpdatePeerCountMetric(peerInfo.PeerClientType, 1);

Expand Down
Loading