Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 4386c7e

Browse files
committed
feat bitfield distribution
1 parent 8348cc4 commit 4386c7e

File tree

4 files changed

+191
-0
lines changed

4 files changed

+191
-0
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ members = [
4343
"service",
4444
"validation",
4545

46+
"node/availability/bitfield-distribution",
4647
"node/core/proposer",
4748
"node/network/bridge",
4849
"node/network/statement-distribution",
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "polkadot-availability-bitfield-distribution"
3+
version = "0.1.0"
4+
authors = ["Parity Technologies <[email protected]>"]
5+
edition = "2018"
6+
7+
[dependencies]
8+
futures = "0.3.5"
9+
log = "0.4.8"
10+
polkadot-primitives = { path = "../../../primitives" }
11+
polkadot-node-subsystem = { path = "../../subsystem" }
12+
polkadot-network-bridge = { path = "../../network/bridge" }
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright 2020 Parity Technologies (UK) Ltd.
2+
// This file is part of Polkadot.
3+
4+
// Polkadot is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
9+
// Polkadot is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU General Public License for more details.
13+
14+
// You should have received a copy of the GNU General Public License
15+
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
16+
17+
//! The bitfield distribution subsystem spreading @todo .
18+
19+
use bridge::NetworkBridgeMessage;
20+
use futures::{channel::oneshot, Future};
21+
use node_primitives::{ProtocolId, SignedFullStatement, View};
22+
use polkadot_node_subsystem::{
23+
messages::{AllMessages, BitfieldDistributionMessage},
24+
OverseerSignal, SubsystemResult,
25+
};
26+
use polkadot_node_subsystem::{FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext};
27+
use polkadot_primitives::Hash;
28+
use std::{collections::HashMap, pin::Pin};
29+
30+
// @todo split in multiple costs
31+
const COST_UNEXPECTED: Rep = Rep::new(-100, "Unexpected");
32+
33+
#[derive(Default, Clone)]
34+
struct Tracker {
35+
// track all active peers and their views
36+
// to determine what is relevant to them
37+
peer_views: HashMap<PeerId, View>,
38+
39+
// our current view
40+
view: View,
41+
}
42+
43+
44+
45+
fn network_update_message(n: NetworkBridgeEvent) -> AllMessages {
46+
AllMessages::BitfieldDistribution(BitfieldDistributionMessage::NetworkBridgeUpdate(n))
47+
}
48+
49+
pub struct BitfieldDistribution;
50+
51+
impl BitfieldDistribution {
52+
const PROTOCOL_ID: ProtocolId = *b"bitd";
53+
54+
async fn run<Context>(mut ctx: Context) -> SubsystemResult<()>
55+
where
56+
Context: SubsystemContext<Message = BitfieldSigningMessage>,
57+
{
58+
// startup: register the network protocol with the bridge.
59+
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::RegisterEventProducer(
60+
Self::PROTOCOL_ID,
61+
handle_network_msg,
62+
))).await?;
63+
64+
let mut data = Tracker::default();
65+
loop {
66+
{
67+
let x = ctx.recv().await?;
68+
match x {
69+
FromOverseer::Communication { msg: _ } => {
70+
unreachable!("BitfieldDistributionMessage does not exist; qed")
71+
}
72+
FromOverseer::Signal(OverseerSignal::StartWork(hash)) => {
73+
// @todo cannot work
74+
// tracker.active_heads.insert(hash.clone(), process(&mut data, hash));
75+
}
76+
FromOverseer::Signal(OverseerSignal::StopWork(hash)) => {
77+
// could work, but see above
78+
tracker.active_heads.remove(&hash);
79+
}
80+
FromOverseer::Signal(OverseerSignal::Conclude) => break,
81+
}
82+
}
83+
active_jobs.retain(|_, future| future.poll().is_pending());
84+
}
85+
Ok(())
86+
}
87+
}
88+
89+
/// Handle an incoming message
90+
async fn process_incoming(
91+
tracker: &mut Tracker,
92+
message: BitfieldDistributionMessage,
93+
) -> SubsystemResult<()> {
94+
match message {
95+
/// Distribute a bitfield via gossip to other validators.
96+
BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability) => {
97+
// @todo check signature, where to get the SingingContext<Hash> from?
98+
// signed_availability.check_signature(signing_ctx, validator_id)?;
99+
100+
for (peerid, view) in tracker.peer_views.filter(|(_peerid,view)| {
101+
view.contains(hash)
102+
}) {
103+
// @todo verify sequential execution is ok or if spawning tasks is better
104+
105+
106+
}
107+
}
108+
BitfieldDistributionMessage::NetworkBridgeUpdate(event) => {
109+
handle_network_msg(
110+
&mut tracker,
111+
&mut ctx,
112+
event,
113+
)
114+
.await?
115+
}
116+
}
117+
Ok(())
118+
}
119+
120+
/// Deal with network bridge updates and track what needs to be tracked
121+
async fn handle_network_msg(
122+
mut ctx: impl SubsystemContext<Message = BitfieldDistributionMessage>,
123+
tracker: &mut Tracker,
124+
bridge_event: NetworkBridgeMessage,
125+
) -> SubsystemResult<()> {
126+
match bridge_message {
127+
NetworkBridgeMessage::PeerConnected(peerid, _role) => {
128+
// insert if none already present
129+
tracker.peer_views.entry(peerid).or_insert(View::default());
130+
}
131+
NetworkBridgeMessage::PeerDisconnected(peerid) => {
132+
// get rid of superfluous data
133+
tracker.peer_views.remove(peerid);
134+
}
135+
NetworkBridgeMessage::PeerViewChange(peerid, view) => {
136+
tracker.peer_views.entry(peerid).modify(|val| {
137+
*val = view
138+
});
139+
140+
},
141+
NetworkBridgeEvent::OurViewChange(view) => {
142+
let old_view = std::mem::replace(tracker.view, view);
143+
tracker
144+
.active_heads
145+
.retain(|head, _| tracker.view.contains(head));
146+
147+
for new in tracker.view.difference(&old_view) {
148+
if !tracker.active_heads.contains_key(&new) {
149+
log::warn!("Active head running that's not active anymore, go catch it") //@todo rephrase
150+
//@todo should we get rid of that right here
151+
}
152+
}
153+
}
154+
}
155+
Ok(())
156+
}
157+
158+
impl<C> Subsystem<C> for BitfieldDistribution
159+
where
160+
C: SubsystemContext<Message = BitfieldDistributionMessage>,
161+
{
162+
fn start(self, ctx: C) -> SpawnedSubsystem {
163+
SpawnedSubsystem(Box::pin(async move {
164+
Self::run(ctx).await;
165+
}))
166+
}
167+
}

0 commit comments

Comments
 (0)