Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions crates/iroha_core/src/peers_gossiper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ impl PeersGossiperHandle {

/// Actor which gossips peers addresses.
pub struct PeersGossiper {
/// Id of the current peer
peer_id: PeerId,
/// Peers provided at startup
initial_peers: BTreeMap<PeerId, SocketAddr>,
/// Peers received via gossiping from other peers
Expand All @@ -68,6 +70,7 @@ pub struct PeersGossiper {
impl PeersGossiper {
/// Start actor.
pub fn start(
peer_id: PeerId,
trusted_peers: TrustedPeers,
network: IrohaNetwork,
shutdown_signal: ShutdownSignal,
Expand All @@ -78,6 +81,7 @@ impl PeersGossiper {
.map(|peer| (peer.id, peer.address))
.collect();
let gossiper = Self {
peer_id,
initial_peers,
gossip_peers: BTreeMap::new(),
current_topology: BTreeSet::new(),
Expand Down Expand Up @@ -176,16 +180,17 @@ impl PeersGossiper {

let mut peers = Vec::new();
for (id, address) in &self.initial_peers {
if !online_peers_ids.contains(id) {
peers.push((id.clone(), address.clone()));
}
peers.push((id.clone(), address.clone()));
}
for (id, addresses) in &self.gossip_peers {
if !online_peers_ids.contains(id) {
peers.push((id.clone(), choose_address_majority_rule(addresses)));
}
peers.push((id.clone(), choose_address_majority_rule(addresses)));
}

let peers = peers
.into_iter()
.filter(|(id, _)| !online_peers_ids.contains(id) && id != &self.peer_id)
.collect();

let update = UpdatePeers(peers);
self.network.update_peers_addresses(update);
}
Expand Down
2 changes: 2 additions & 0 deletions crates/iroha_p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ struct NetworkBase<T: Pload, K: Kex, E: Enc> {
current_conn_id: ConnectionId,
/// Current topology
current_topology: HashSet<PeerId>,
/// Peers which are not yet connected, but should.
///
/// Can have two addresses for same `PeerId`.
/// * One initially provided via config
/// * Second received from other peers via gossiping
Expand Down
35 changes: 33 additions & 2 deletions crates/iroha_p2p/src/peer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Tokio actor Peer

use std::net::SocketAddr;

use bytes::{Buf, BufMut, BytesMut};
use message::*;
use parity_scale_codec::{DecodeAll, Encode};
Expand Down Expand Up @@ -110,7 +112,7 @@ mod run {

/// Peer task.
#[allow(clippy::too_many_lines)]
#[log(skip_all, fields(conn_id = peer.connection_id(), peer, disambiguator))]
#[log(skip_all, fields(connection = &peer.log_description(), conn_id = peer.connection_id(), peer, disambiguator))]
pub(super) async fn run<T: Pload, K: Kex, E: Enc, P: Entrypoint<K, E>>(
RunPeerArgs {
peer,
Expand Down Expand Up @@ -145,6 +147,7 @@ mod run {
read,
write,
id: connection_id,
..
},
cryptographer,
} = ready_peer;
Expand Down Expand Up @@ -298,18 +301,38 @@ mod run {
/// Trait for peer stages that might be used as starting point for peer's [`run`] function.
pub(super) trait Entrypoint<K: Kex, E: Enc>: Handshake<K, E> + Send + 'static {
fn connection_id(&self) -> ConnectionId;

/// Debug description, used for logging
fn log_description(&self) -> String;
}

impl<K: Kex, E: Enc> Entrypoint<K, E> for Connecting {
fn connection_id(&self) -> ConnectionId {
self.connection_id
}

fn log_description(&self) -> String {
format!("outgoing to {}", self.peer_addr)
}
}

impl<K: Kex, E: Enc> Entrypoint<K, E> for ConnectedFrom {
fn connection_id(&self) -> ConnectionId {
self.connection.id
}

fn log_description(&self) -> String {
#[allow(clippy::option_if_let_else)]
match self.connection.remote_addr {
None => "incoming".to_owned(),
Some(remote_addr) => {
// In case of incoming connection,
// only host will have some meaningful value.
// Port will have some random value chosen only for this connection.
format!("incoming from {}", remote_addr.ip())
}
}
}
}

/// Cancellation-safe way to read messages from tcp stream
Expand Down Expand Up @@ -859,12 +882,20 @@ pub struct Connection {
pub read: OwnedReadHalf,
/// Writing half of `TcpStream`
pub write: OwnedWriteHalf,
/// Remote addr, for logging purpose.
pub remote_addr: Option<SocketAddr>,
}

impl Connection {
/// Instantiate new connection from `connection_id` and `stream`.
pub fn new(id: ConnectionId, stream: TcpStream) -> Self {
let remote_addr = stream.peer_addr().ok();
let (read, write) = stream.into_split();
Connection { id, read, write }
Connection {
id,
read,
write,
remote_addr,
}
}
}
1 change: 1 addition & 0 deletions crates/irohad/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ impl Iroha {
);

let (peers_gossiper, child) = PeersGossiper::start(
config.common.peer.id.clone(),
config.common.trusted_peers.value().clone(),
network.clone(),
supervisor.shutdown_signal(),
Expand Down
Loading