-
Notifications
You must be signed in to change notification settings - Fork 770
Comm Component for Simplex #3998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
dde076d
c5be0a4
5de26b1
52d343e
3f74024
d0f0102
458d540
49aa0bb
4dfdb34
475a5ef
8c78c03
14ad42f
2580d63
ed4422b
14aef3e
7d3484d
b13bc72
88b06d5
5f84156
3c362ea
03bdc02
ebdbfc9
7a7e5e7
d9b8a2c
c964299
12599d2
a3820e4
5119e93
84106b0
35c0161
e8a91ec
ad8ea62
97c816b
91ec652
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,143 @@ | ||||||||||
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. | ||||||||||
// See the file LICENSE for licensing terms. | ||||||||||
|
||||||||||
package simplex | ||||||||||
|
||||||||||
import ( | ||||||||||
"bytes" | ||||||||||
"errors" | ||||||||||
"fmt" | ||||||||||
"slices" | ||||||||||
|
||||||||||
"github.com/ava-labs/simplex" | ||||||||||
"go.uber.org/zap" | ||||||||||
|
||||||||||
"github.com/ava-labs/avalanchego/ids" | ||||||||||
"github.com/ava-labs/avalanchego/message" | ||||||||||
"github.com/ava-labs/avalanchego/proto/pb/p2p" | ||||||||||
"github.com/ava-labs/avalanchego/snow/engine/common" | ||||||||||
"github.com/ava-labs/avalanchego/snow/networking/sender" | ||||||||||
"github.com/ava-labs/avalanchego/subnets" | ||||||||||
"github.com/ava-labs/avalanchego/utils/set" | ||||||||||
) | ||||||||||
|
||||||||||
var ( | ||||||||||
_ simplex.Communication = (*Comm)(nil) | ||||||||||
errNodeNotFound = errors.New("node not found in the validator list") | ||||||||||
) | ||||||||||
|
||||||||||
type Comm struct { | ||||||||||
logger simplex.Logger | ||||||||||
subnetID ids.ID | ||||||||||
chainID ids.ID | ||||||||||
// nodeID is this nodes ID | ||||||||||
nodeID simplex.NodeID | ||||||||||
// nodes are the IDs of all the nodes in the subnet | ||||||||||
nodes []simplex.NodeID | ||||||||||
// sender is used to send messages to other nodes | ||||||||||
sender sender.ExternalSender | ||||||||||
msgBuilder message.OutboundMsgBuilder | ||||||||||
} | ||||||||||
|
||||||||||
func NewComm(config *Config) (*Comm, error) { | ||||||||||
nodes := make([]simplex.NodeID, 0, len(config.Validators)) | ||||||||||
|
||||||||||
// grab all the nodes that are validators for the subnet | ||||||||||
for _, vd := range config.Validators { | ||||||||||
nodes = append(nodes, vd.NodeID[:]) | ||||||||||
} | ||||||||||
|
||||||||||
if _, ok := config.Validators[config.Ctx.NodeID]; !ok { | ||||||||||
samliok marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the plan for non-validator nodes? Is this going to be modified in the future? Or is there going to be some other implementation entirely for non-validators? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no concrete written plan yet, but I think this interface should not be used for non validators. Right now, we're wiring the epoch object into Simplex despite it not being the most high level component. |
||||||||||
config.Log.Warn("Node is not a validator for the subnet", | ||||||||||
zap.String("nodeID", config.Ctx.NodeID.String()), | ||||||||||
zap.String("chainID", config.Ctx.ChainID.String()), | ||||||||||
zap.String("subnetID", config.Ctx.SubnetID.String()), | ||||||||||
) | ||||||||||
return nil, fmt.Errorf("%w could not find our node: %s", errNodeNotFound, config.Ctx.NodeID) | ||||||||||
} | ||||||||||
|
||||||||||
sortNodes(nodes) | ||||||||||
|
||||||||||
c := &Comm{ | ||||||||||
subnetID: config.Ctx.SubnetID, | ||||||||||
nodes: nodes, | ||||||||||
nodeID: config.Ctx.NodeID[:], | ||||||||||
logger: config.Log, | ||||||||||
sender: config.Sender, | ||||||||||
msgBuilder: config.OutboundMsgBuilder, | ||||||||||
chainID: config.Ctx.ChainID, | ||||||||||
} | ||||||||||
|
||||||||||
return c, nil | ||||||||||
} | ||||||||||
|
||||||||||
// sortNodes sorts the nodes in place by their byte representations. | ||||||||||
func sortNodes(nodes []simplex.NodeID) { | ||||||||||
slices.SortFunc(nodes, func(i, j simplex.NodeID) int { | ||||||||||
return bytes.Compare(i, j) | ||||||||||
}) | ||||||||||
} | ||||||||||
|
||||||||||
func (c *Comm) ListNodes() []simplex.NodeID { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I feel like the return type makes it obvious that this is a list. Should this just be:
Suggested change
? Or perhaps even
Suggested change
|
||||||||||
return c.nodes | ||||||||||
} | ||||||||||
|
||||||||||
func (c *Comm) SendMessage(msg *simplex.Message, destination simplex.NodeID) { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
Suggested change
|
||||||||||
outboundMsg, err := c.simplexMessageToOutboundMessage(msg) | ||||||||||
if err != nil { | ||||||||||
c.logger.Error("Failed creating message", zap.Error(err)) | ||||||||||
return | ||||||||||
} | ||||||||||
|
||||||||||
dest, err := ids.ToNodeID(destination) | ||||||||||
if err != nil { | ||||||||||
c.logger.Error("Failed to convert destination NodeID", zap.Error(err)) | ||||||||||
return | ||||||||||
} | ||||||||||
|
||||||||||
c.sender.Send(outboundMsg, common.SendConfig{NodeIDs: set.Of(dest)}, c.subnetID, subnets.NoOpAllower) | ||||||||||
} | ||||||||||
|
||||||||||
func (c *Comm) Broadcast(msg *simplex.Message) { | ||||||||||
for _, node := range c.nodes { | ||||||||||
if node.Equals(c.nodeID) { | ||||||||||
continue | ||||||||||
} | ||||||||||
|
||||||||||
c.SendMessage(msg, node) | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
func (c *Comm) simplexMessageToOutboundMessage(msg *simplex.Message) (message.OutboundMessage, error) { | ||||||||||
var simplexMsg *p2p.Simplex | ||||||||||
switch { | ||||||||||
case msg.VerifiedBlockMessage != nil: | ||||||||||
bytes, err := msg.VerifiedBlockMessage.VerifiedBlock.Bytes() | ||||||||||
if err != nil { | ||||||||||
return nil, fmt.Errorf("failed to serialize block: %w", err) | ||||||||||
} | ||||||||||
simplexMsg = newP2PSimplexBlockProposal(c.chainID, bytes, msg.VerifiedBlockMessage.Vote) | ||||||||||
case msg.VoteMessage != nil: | ||||||||||
simplexMsg = newP2PSimplexVote(c.chainID, msg.VoteMessage.Vote.BlockHeader, msg.VoteMessage.Signature) | ||||||||||
case msg.EmptyVoteMessage != nil: | ||||||||||
simplexMsg = newP2PSimplexEmptyVote(c.chainID, msg.EmptyVoteMessage.Vote.ProtocolMetadata, msg.EmptyVoteMessage.Signature) | ||||||||||
case msg.FinalizeVote != nil: | ||||||||||
simplexMsg = newP2PSimplexFinalizeVote(c.chainID, msg.FinalizeVote.Finalization.BlockHeader, msg.FinalizeVote.Signature) | ||||||||||
case msg.Notarization != nil: | ||||||||||
simplexMsg = newP2PSimplexNotarization(c.chainID, msg.Notarization.Vote.BlockHeader, msg.Notarization.QC.Bytes()) | ||||||||||
case msg.EmptyNotarization != nil: | ||||||||||
simplexMsg = newP2PSimplexEmptyNotarization(c.chainID, msg.EmptyNotarization.Vote.ProtocolMetadata, msg.EmptyNotarization.QC.Bytes()) | ||||||||||
case msg.Finalization != nil: | ||||||||||
simplexMsg = newP2PSimplexFinalization(c.chainID, msg.Finalization.Finalization.BlockHeader, msg.Finalization.QC.Bytes()) | ||||||||||
case msg.ReplicationRequest != nil: | ||||||||||
simplexMsg = newP2PSimplexReplicationRequest(c.chainID, msg.ReplicationRequest.Seqs, msg.ReplicationRequest.LatestRound) | ||||||||||
case msg.VerifiedReplicationResponse != nil: | ||||||||||
msg, err := newP2PSimplexReplicationResponse(c.chainID, msg.VerifiedReplicationResponse.Data, msg.VerifiedReplicationResponse.LatestRound) | ||||||||||
if err != nil { | ||||||||||
return nil, fmt.Errorf("failed to create replication response: %w", err) | ||||||||||
} | ||||||||||
simplexMsg = msg | ||||||||||
} | ||||||||||
|
||||||||||
return c.msgBuilder.SimplexMessage(simplexMsg) | ||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. | ||
// See the file LICENSE for licensing terms. | ||
|
||
package simplex | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/ava-labs/simplex" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/mock/gomock" | ||
|
||
"github.com/ava-labs/avalanchego/ids" | ||
"github.com/ava-labs/avalanchego/message" | ||
"github.com/ava-labs/avalanchego/message/messagemock" | ||
"github.com/ava-labs/avalanchego/snow/networking/sender/sendermock" | ||
"github.com/ava-labs/avalanchego/utils/constants" | ||
) | ||
|
||
var testSimplexMessage = simplex.Message{ | ||
VoteMessage: &simplex.Vote{ | ||
Vote: simplex.ToBeSignedVote{ | ||
BlockHeader: simplex.BlockHeader{ | ||
ProtocolMetadata: simplex.ProtocolMetadata{ | ||
Version: 1, | ||
Epoch: 1, | ||
Round: 1, | ||
Seq: 1, | ||
}, | ||
}, | ||
}, | ||
Signature: simplex.Signature{ | ||
Signer: []byte("dummy_node_id"), | ||
Value: []byte("dummy_signature"), | ||
}, | ||
}, | ||
} | ||
|
||
func TestCommSendMessage(t *testing.T) { | ||
config := newEngineConfig(t, 1) | ||
|
||
destinationNodeID := ids.GenerateTestNodeID() | ||
ctrl := gomock.NewController(t) | ||
sender := sendermock.NewExternalSender(ctrl) | ||
mc, err := message.NewCreator( | ||
prometheus.NewRegistry(), | ||
constants.DefaultNetworkCompressionType, | ||
10*time.Second, | ||
) | ||
require.NoError(t, err) | ||
|
||
config.OutboundMsgBuilder = mc | ||
config.Sender = sender | ||
|
||
comm, err := NewComm(config) | ||
require.NoError(t, err) | ||
|
||
sender.EXPECT().Send(gomock.Any(), gomock.Any(), comm.subnetID, gomock.Any()) | ||
|
||
comm.SendMessage(&testSimplexMessage, destinationNodeID[:]) | ||
} | ||
|
||
// TestCommBroadcast tests the Broadcast method sends to all nodes in the subnet | ||
// not including the sending node. | ||
func TestCommBroadcast(t *testing.T) { | ||
config := newEngineConfig(t, 3) | ||
|
||
ctrl := gomock.NewController(t) | ||
sender := sendermock.NewExternalSender(ctrl) | ||
mc, err := message.NewCreator( | ||
prometheus.NewRegistry(), | ||
constants.DefaultNetworkCompressionType, | ||
10*time.Second, | ||
) | ||
require.NoError(t, err) | ||
|
||
config.OutboundMsgBuilder = mc | ||
config.Sender = sender | ||
|
||
comm, err := NewComm(config) | ||
require.NoError(t, err) | ||
|
||
sender.EXPECT().Send(gomock.Any(), gomock.Any(), comm.subnetID, gomock.Any()).Times(2) | ||
|
||
comm.Broadcast(&testSimplexMessage) | ||
} | ||
|
||
func TestCommFailsWithoutCurrentNode(t *testing.T) { | ||
config := newEngineConfig(t, 3) | ||
|
||
ctrl := gomock.NewController(t) | ||
msgCreator := messagemock.NewOutboundMsgBuilder(ctrl) | ||
sender := sendermock.NewExternalSender(ctrl) | ||
|
||
config.OutboundMsgBuilder = msgCreator | ||
config.Sender = sender | ||
|
||
// set the curNode to a different nodeID than the one in the config | ||
vdrs := generateTestNodes(t, 3) | ||
config.Validators = newTestValidatorInfo(vdrs) | ||
|
||
_, err := NewComm(config) | ||
require.ErrorIs(t, err, errNodeNotFound) | ||
} |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR, but I have to say I am not sure I understand why this is an un-exported struct. it Makes it impossible to use anywhere in the code (for tests) and we also have only one concrete implementation of it.
@StephenButtolph do you know why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can utilize this in tests by using
message.NewCreator
.