Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }

# Cumulus
cumulus-primitives-core = { path = "../../../primitives/core" }
cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }

[dev-dependencies]
Expand Down
72 changes: 53 additions & 19 deletions client/consensus/common/src/parachain_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use async_trait::async_trait;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
Expand All @@ -27,15 +26,25 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT},
};

use cumulus_primitives_core::{RecoveryDelay, RecoveryKind, RecoveryRequest};
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};

use polkadot_primitives::v2::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};

use codec::Decode;
use futures::{select, FutureExt, Stream, StreamExt};
use futures::{channel::mpsc::Sender, select, FutureExt, Stream, StreamExt};

use std::{pin::Pin, sync::Arc};
use std::{pin::Pin, sync::Arc, time::Duration};

const LOG_TARGET: &str = "cumulus-consensus";

// Delay range to trigger explicit requests.
// The chosen value doesn't have any special meaning, a random delay within the order of
// seconds in practice should be a good enough to allow a quick recovery without DOSing
// the relay chain.
const RECOVERY_DELAY: RecoveryDelay =
RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) };

/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
#[async_trait]
pub trait RelaychainClient: Clone + 'static {
Expand Down Expand Up @@ -82,15 +91,15 @@ where
let finalized_head = if let Some(h) = finalized_heads.next().await {
h
} else {
tracing::debug!(target: "cumulus-consensus", "Stopping following finalized head.");
tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
return
};

let header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
Expand All @@ -105,12 +114,12 @@ where
if let Err(e) = parachain.finalize_block(hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: "cumulus-consensus",
target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
Expand All @@ -136,6 +145,7 @@ pub async fn run_parachain_consensus<P, R, Block, B>(
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: Finalizer<Block, B>
Expand All @@ -148,8 +158,13 @@ pub async fn run_parachain_consensus<P, R, Block, B>(
R: RelaychainClient,
B: Backend<Block>,
{
let follow_new_best =
follow_new_best(para_id, parachain.clone(), relay_chain.clone(), announce_block);
let follow_new_best = follow_new_best(
para_id,
parachain.clone(),
relay_chain.clone(),
announce_block,
recovery_chan_tx,
);
let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain);
select! {
_ = follow_new_best.fuse() => {},
Expand All @@ -163,6 +178,7 @@ async fn follow_new_best<P, R, Block, B>(
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: Finalizer<Block, B>
Expand Down Expand Up @@ -197,10 +213,11 @@ async fn follow_new_best<P, R, Block, B>(
h,
&*parachain,
&mut unset_best_header,
recovery_chan_tx.clone(),
).await,
None => {
tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
"Stopping following new best.",
);
return
Expand All @@ -217,7 +234,7 @@ async fn follow_new_best<P, R, Block, B>(
).await,
None => {
tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
"Stopping following imported blocks.",
);
return
Expand Down Expand Up @@ -276,7 +293,7 @@ async fn handle_new_block_imported<Block, P>(
import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
},
state => tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
?unset_best_header,
?notification.header,
?state,
Expand All @@ -290,6 +307,7 @@ async fn handle_new_best_parachain_head<Block, P>(
head: Vec<u8>,
parachain: &P,
unset_best_header: &mut Option<Block::Header>,
mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
Expand All @@ -299,7 +317,7 @@ async fn handle_new_best_parachain_head<Block, P>(
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
error = ?err,
"Could not decode Parachain header while following best heads.",
);
Expand All @@ -311,7 +329,7 @@ async fn handle_new_best_parachain_head<Block, P>(

if parachain.usage_info().chain.best_hash == hash {
tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
block_hash = ?hash,
"Skipping set new best block, because block is already the best.",
)
Expand All @@ -325,7 +343,7 @@ async fn handle_new_best_parachain_head<Block, P>(
},
Ok(BlockStatus::InChainPruned) => {
tracing::error!(
target: "cumulus-collator",
target: LOG_TARGET,
block_hash = ?hash,
"Trying to set pruned block as new best!",
);
Expand All @@ -334,14 +352,30 @@ async fn handle_new_best_parachain_head<Block, P>(
*unset_best_header = Some(parachain_head);

tracing::debug!(
target: "cumulus-collator",
target: LOG_TARGET,
block_hash = ?hash,
"Parachain block not yet imported, waiting for import to enact as best block.",
);

if let Some(ref mut recovery_chan_tx) = recovery_chan_tx {
// Best effort channel to actively encourage block recovery.
// An error here is not fatal; the relay chain continuously re-announces
// the best block, thus we will have other opportunities to retry.
let req =
RecoveryRequest { hash, delay: RECOVERY_DELAY, kind: RecoveryKind::Full };
if let Err(err) = recovery_chan_tx.try_send(req) {
tracing::warn!(
target: LOG_TARGET,
block_hash = ?hash,
error = ?err,
"Unable to notify block recovery subsystem"
)
}
}
},
Err(e) => {
tracing::error!(
target: "cumulus-collator",
target: LOG_TARGET,
block_hash = ?hash,
error = ?e,
"Failed to get block status of block.",
Expand All @@ -361,7 +395,7 @@ where
let best_number = parachain.usage_info().chain.best_number;
if *header.number() < best_number {
tracing::debug!(
target: "cumulus-consensus",
target: LOG_TARGET,
%best_number,
block_number = %header.number(),
"Skipping importing block as new best block, because there already exists a \
Expand All @@ -377,7 +411,7 @@ where

if let Err(err) = (&*parachain).import_block(block_import_params, Default::default()).await {
tracing::warn!(
target: "cumulus-consensus",
target: LOG_TARGET,
block_hash = ?hash,
error = ?err,
"Failed to set new best block.",
Expand Down
Loading