Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 40 additions & 104 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,6 @@ pub enum BlockProcessStatus<E: EthSpec> {
ExecutionValidated(Arc<SignedBeaconBlock<E>>),
}

pub struct BeaconChainMetrics {
pub reqresp_pre_import_cache_len: usize,
}

pub type LightClientProducerEvent<T> = (Hash256, Slot, SyncAggregate<T>);

pub type BeaconForkChoice<T> = ForkChoice<
Expand All @@ -363,9 +359,6 @@ pub type BeaconStore<T> = Arc<
>,
>;

/// Cache gossip verified blocks to serve over ReqResp before they are imported
type ReqRespPreImportCache<E> = HashMap<Hash256, Arc<SignedBeaconBlock<E>>>;

/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block
/// operations and chooses a canonical head.
pub struct BeaconChain<T: BeaconChainTypes> {
Expand Down Expand Up @@ -462,8 +455,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub(crate) attester_cache: Arc<AttesterCache>,
/// A cache used when producing attestations whilst the head block is still being imported.
pub early_attester_cache: EarlyAttesterCache<T::EthSpec>,
/// Cache gossip verified blocks to serve over ReqResp before they are imported
pub reqresp_pre_import_cache: Arc<RwLock<ReqRespPreImportCache<T::EthSpec>>>,
/// A cache used to keep track of various block timings.
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
/// A cache used to track pre-finalization block roots for quick rejection.
Expand Down Expand Up @@ -1289,18 +1280,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// chain. Used by sync to learn the status of a block and prevent repeated downloads /
/// processing attempts.
pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus<T::EthSpec> {
if let Some(block) = self
.data_availability_checker
.get_execution_valid_block(block_root)
{
return BlockProcessStatus::ExecutionValidated(block);
}

if let Some(block) = self.reqresp_pre_import_cache.read().get(block_root) {
// A block is on the `reqresp_pre_import_cache` but NOT in the
// `data_availability_checker` only if it is actively processing. We can expect a future
// event with the result of processing
return BlockProcessStatus::NotValidated(block.clone());
if let Some(cached_block) = self.data_availability_checker.get_cached_block(block_root) {
return cached_block;
}

BlockProcessStatus::Unknown
Expand Down Expand Up @@ -3054,8 +3035,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob()));

let r = self.check_gossip_blob_availability_and_import(blob).await;
self.remove_notified(&block_root, r)
self.check_gossip_blob_availability_and_import(blob).await
}

/// Cache the data columns in the processing cache, process it, then evict it from the cache if it was
Expand Down Expand Up @@ -3092,15 +3072,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
data_columns.iter().map(|column| column.as_data_column()),
);

let r = self
.check_gossip_data_columns_availability_and_import(
slot,
block_root,
data_columns,
publish_fn,
)
.await;
self.remove_notified(&block_root, r)
self.check_gossip_data_columns_availability_and_import(
slot,
block_root,
data_columns,
publish_fn,
)
.await
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
Expand Down Expand Up @@ -3139,10 +3117,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));

let r = self
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await;
self.remove_notified(&block_root, r)
self.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await
}

/// Process blobs retrieved from the EL and returns the `AvailabilityProcessingStatus`.
Expand Down Expand Up @@ -3174,10 +3150,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

let r = self
.check_engine_blobs_availability_and_import(slot, block_root, engine_get_blobs_output)
.await;
self.remove_notified(&block_root, r)
self.check_engine_blobs_availability_and_import(slot, block_root, engine_get_blobs_output)
.await
}

fn emit_sse_blob_sidecar_events<'a, I>(self: &Arc<Self>, block_root: &Hash256, blobs_iter: I)
Expand Down Expand Up @@ -3270,10 +3244,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
custody_columns.iter().map(|column| column.as_ref()),
);

let r = self
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await;
self.remove_notified(&block_root, r)
self.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await
}

pub async fn reconstruct_data_columns(
Expand Down Expand Up @@ -3320,10 +3292,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(None);
};

let r = self
.process_availability(slot, availability, || Ok(()))
.await;
self.remove_notified(&block_root, r)
self.process_availability(slot, availability, || Ok(()))
.await
.map(|availability_processing_status| {
Some((availability_processing_status, data_columns_to_publish))
})
Expand All @@ -3340,46 +3310,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still need to handle the case where block import was errored and remove them from cache.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The damage to leave the invalid block in the cache is that we might serve the block over RPC after knowing it's invalid. Consider the case where we remove the block from the da_checker after noticing it's invalid and then someone triggers lookup sync and we fetch it again. We will serve the block over RPC while verification is happening for the second time.

I don't see that big of an issue with leaving the invalid block in the da_checker until LRU evicts it. Otherwise, we can leave it there but mark it as "INVALID"

Copy link
Member Author

@jimmygchen jimmygchen Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud -

If the block is invalid we will downscore the peer.

If we leave the invalid block in the cache

  • we might serve the invalid block over RPC (we already are doing it before execution) - however, we are unlikely to get downscored because we will never attest or build on top of thisblock.
  • we avoid downloading the invalid block again, even if a peer sends us a child block / attestation

If we remove the invalid block from the cache

  • we still serve the invalid block over RPC before execution, but NOT after it failed validation
  • a peer might get us to fetch again and and serve us the invalid block, we will downscore them after execution
  • it doesn't' really give us much because we could still serve the invalid block over RPC before execution, and even if we remove it, it could come back again

I don't see that big of an issue with leaving the invalid block in the da_checker until LRU evicts it.

Yes agree. I actually think leaving it in the cache to prevent the lookup is better.

Otherwise, we can leave it there but mark it as "INVALID"

A block that does not transition into executed block will never become available so we don't really need to mark them I believe.

Copy link
Member Author

@jimmygchen jimmygchen Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising these good points, I'm now inclined to leave it as it is, as it doesn't seem to help us that much by removing it, as we don't really benefit from doing it for extra code and complexity.

Copy link
Member Author

@jimmygchen jimmygchen Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding more thoughts after seeing the failed block_in_processing_cache_becomes_invalid test.

If the block fails execution initially for whatever reason (e.g. engine offline), and we keep it in the cache, then there's a chance that the chain might get stuck, because we will never perform lookup until the block is evicted from the cache. I think this is not great, I'll add code to remove it so we re-trigger a new lookup in this case.

if it is indeed an invalid block, peer will get penalised for getting us to do the lookup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing failed execution block in 5f2e85c

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will never perform lookup until the block is evicted from the cache

Just to be clear, is this because we cannot re-trigger the execution of a block sitting in the da checker? not fully clear on this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly - because lookup will not re-download the block if it's in the DA checker:

match self.chain.get_block_process_status(&block_root) {
// Unknown block, continue request to download
BlockProcessStatus::Unknown => {}
// Block is known are currently processing, expect a future event with the result of
// processing.
BlockProcessStatus::NotValidated { .. } => {
// Lookup sync event safety: If the block is currently in the processing cache, we
// are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will
// make progress on this lookup
return Ok(LookupRequestResult::Pending("block in processing cache"));
}
// Block is fully validated. If it's not yet imported it's waiting for missing block
// components. Consider this request completed and do nothing.
BlockProcessStatus::ExecutionValidated { .. } => {
return Ok(LookupRequestResult::NoRequestNeeded(
"block execution validated",
));
}
}

so if it fails the first time because of unexpected error, e.g. EL timeout, then it doesn't gets re-downloaded, until we fallback to range sync

&self,
block_root: &Hash256,
r: Result<AvailabilityProcessingStatus, BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
self.reqresp_pre_import_cache.write().remove(block_root);
}
r
}

/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
/// and evict if the block was imported or errored.
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
self: &Arc<Self>,
block_root: Hash256,
unverified_block: B,
block_source: BlockImportSource,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.reqresp_pre_import_cache
.write()
.insert(block_root, unverified_block.block_cloned());

let r = self
.process_block(
block_root,
unverified_block,
notify_execution_layer,
block_source,
|| Ok(()),
)
.await;
self.remove_notified(&block_root, r)
}

/// Check for known and configured invalid block roots before processing.
pub fn check_invalid_block_roots(&self, block_root: Hash256) -> Result<(), BlockError> {
if self.config.invalid_block_roots.contains(&block_root) {
Expand Down Expand Up @@ -3411,12 +3341,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_source: BlockImportSource,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);

// Increment the Prometheus counter for block processing requests.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);

let block_slot = unverified_block.block().slot();

// Set observed time if not already set. Usually this should be set by gossip or RPC,
Expand All @@ -3431,6 +3355,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}

self.data_availability_checker
.put_pre_execution_block(block_root, unverified_block.block_cloned())?;
Copy link
Collaborator

@dapplion dapplion Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that if the block is unverified, so it can be 10 MB. The current max size of the cache is 32 blocks so there's no risk of OOM.


// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);

// Increment the Prometheus counter for block processing requests.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);

// A small closure to group the verification and import errors.
let chain = self.clone();
let import_block = async move {
Expand All @@ -3448,7 +3381,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.set_time_consensus_verified(block_root, block_slot, timestamp)
}

let executed_block = chain.into_executed_block(execution_pending).await?;
let executed_block = chain
.into_executed_block(execution_pending)
.await
.inspect_err(|_| {
// If the block fails execution for whatever reason (e.g. engine offline),
// and we keep it in the cache, then the node will NOT perform lookup and
// reprocess this block until the block is evicted from DA checker, causing the
// chain to get stuck temporarily if the block is canonical. Therefore we remove
// it from the cache if execution fails.
self.data_availability_checker
.remove_block_on_execution_error(&block_root);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree its safer to just chuck it out of the cache and let lookup fetch the block regardless of the type of the execution error.

})?;

// Record the *additional* time it took to wait for execution layer verification.
if let Some(timestamp) = self.slot_clock.now_duration() {
Expand Down Expand Up @@ -3574,9 +3518,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let slot = block.block.slot();
let availability = self
.data_availability_checker
.put_pending_executed_block(block)?;
let availability = self.data_availability_checker.put_executed_block(block)?;
self.process_availability(slot, availability, || Ok(()))
.await
}
Expand Down Expand Up @@ -7156,12 +7098,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
}

pub fn metrics(&self) -> BeaconChainMetrics {
BeaconChainMetrics {
reqresp_pre_import_cache_len: self.reqresp_pre_import_cache.read().len(),
}
}

pub(crate) fn get_blobs_or_columns_store_op(
&self,
block_root: Hash256,
Expand Down
1 change: 0 additions & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,6 @@ where
validator_pubkey_cache: RwLock::new(validator_pubkey_cache),
attester_cache: <_>::default(),
early_attester_cache: <_>::default(),
reqresp_pre_import_cache: <_>::default(),
light_client_server_cache: LightClientServerCache::new(),
light_client_server_tx: self.light_client_server_tx,
shutdown_sender: self
Expand Down
38 changes: 28 additions & 10 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use crate::block_verification_types::{
use crate::data_availability_checker::overflow_lru_cache::{
DataAvailabilityCheckerInner, ReconstructColumnsDecision,
};
use crate::{BeaconChain, BeaconChainTypes, BeaconStore, CustodyContext, metrics};
use crate::{
BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus, CustodyContext, metrics,
};
use kzg::Kzg;
use slot_clock::SlotClock;
use std::fmt;
Expand All @@ -27,6 +29,7 @@ mod error;
mod overflow_lru_cache;
mod state_lru_cache;

use crate::data_availability_checker::error::Error;
use crate::data_column_verification::{
CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn,
KzgVerifiedDataColumn, verify_kzg_for_data_column_list,
Expand Down Expand Up @@ -141,14 +144,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
&self.custody_context
}

/// Checks if the block root is currenlty in the availability cache awaiting import because
/// Checks if the block root is currently in the availability cache awaiting import because
/// of missing components.
pub fn get_execution_valid_block(
&self,
block_root: &Hash256,
) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
self.availability_cache
.get_execution_valid_block(block_root)
///
/// Returns the cache block wrapped in a `BlockProcessStatus` enum if it exists.
pub fn get_cached_block(&self, block_root: &Hash256) -> Option<BlockProcessStatus<T::EthSpec>> {
self.availability_cache.get_cached_block(block_root)
}

/// Return the set of cached blob indexes for `block_root`. Returns None if there is no block
Expand Down Expand Up @@ -337,12 +338,29 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

/// Check if we have all the blobs for a block. Returns `Availability` which has information
/// about whether all components have been received or more are required.
pub fn put_pending_executed_block(
pub fn put_executed_block(
&self,
executed_block: AvailabilityPendingExecutedBlock<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
self.availability_cache.put_executed_block(executed_block)
}

/// Inserts a pre-execution block into the cache.
/// This does NOT override an existing executed block.
pub fn put_pre_execution_block(
&self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> Result<(), Error> {
self.availability_cache
.put_pre_execution_block(block_root, block)
}

/// Removes a pre-execution block from the cache.
/// This does NOT remove an existing executed block.
pub fn remove_block_on_execution_error(&self, block_root: &Hash256) {
self.availability_cache
.put_pending_executed_block(executed_block)
.remove_pre_execution_block(block_root);
}

/// Verifies kzg commitments for an RpcBlock, returns a `MaybeAvailableBlock` that may
Expand Down
Loading
Loading