Skip to content

Commit 839e854

Browse files
committed
feat: ✨ extend EthereumBlockNotification with reorg info
1 parent a6703ef commit 839e854

File tree

5 files changed

+152
-37
lines changed

5 files changed

+152
-37
lines changed

client/mapping-sync/src/kv/mod.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog};
3636
use fp_rpc::EthereumRuntimeRPCApi;
3737

3838
use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy};
39+
use worker::BestBlockInfo;
3940

4041
pub fn sync_block<Block: BlockT, C: HeaderBackend<Block>>(
4142
storage_override: Arc<dyn StorageOverride<Block>>,
@@ -155,7 +156,7 @@ pub fn sync_one_block<Block: BlockT, C, BE>(
155156
pubsub_notification_sinks: Arc<
156157
EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
157158
>,
158-
best_at_import: &mut HashMap<Block::Hash, <Block::Header as HeaderT>::Number>,
159+
best_at_import: &mut HashMap<Block::Hash, BestBlockInfo<Block>>,
159160
) -> Result<bool, String>
160161
where
161162
C: ProvideRuntimeApi<Block>,
@@ -233,10 +234,15 @@ where
233234
// This avoids race conditions where the best hash may have changed
234235
// between import and sync time (e.g., during rapid reorgs).
235236
// Fall back to current best hash check for blocks synced during catch-up.
236-
let is_new_best =
237-
best_at_import.remove(&hash).is_some() || client.info().best_hash == hash;
238-
sink.unbounded_send(EthereumBlockNotification { is_new_best, hash })
239-
.is_ok()
237+
let best_info = best_at_import.remove(&hash);
238+
let is_new_best = best_info.is_some() || client.info().best_hash == hash;
239+
let reorg_info = best_info.and_then(|info| info.reorg_info);
240+
sink.unbounded_send(EthereumBlockNotification {
241+
is_new_best,
242+
hash,
243+
reorg_info,
244+
})
245+
.is_ok()
240246
} else {
241247
// Remove from the pool if in major syncing.
242248
false
@@ -257,7 +263,7 @@ pub fn sync_blocks<Block: BlockT, C, BE>(
257263
pubsub_notification_sinks: Arc<
258264
EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
259265
>,
260-
best_at_import: &mut HashMap<Block::Hash, <Block::Header as HeaderT>::Number>,
266+
best_at_import: &mut HashMap<Block::Hash, BestBlockInfo<Block>>,
261267
) -> Result<bool, String>
262268
where
263269
C: ProvideRuntimeApi<Block>,
@@ -286,7 +292,7 @@ where
286292
// Entries for finalized blocks are no longer needed since finalized blocks
287293
// cannot be reorged and their is_new_best status is irrelevant.
288294
let finalized_number = client.info().finalized_number;
289-
best_at_import.retain(|_, block_number| *block_number > finalized_number);
295+
best_at_import.retain(|_, info| info.block_number > finalized_number);
290296

291297
Ok(synced_any)
292298
}

client/mapping-sync/src/kv/worker.rs

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,15 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
3737
use fc_storage::StorageOverride;
3838
use fp_rpc::EthereumRuntimeRPCApi;
3939

40-
use crate::SyncStrategy;
40+
use crate::{ReorgInfo, SyncStrategy};
41+
42+
/// Information tracked at import time for a block that was `is_new_best`.
43+
pub struct BestBlockInfo<Block: BlockT> {
44+
/// The block number (for pruning purposes).
45+
pub block_number: <Block::Header as HeaderT>::Number,
46+
/// Reorg info if this block became best as part of a reorganization.
47+
pub reorg_info: Option<ReorgInfo<Block>>,
48+
}
4149

4250
pub struct MappingSyncWorker<Block: BlockT, C, BE> {
4351
import_notifications: ImportNotifications<Block>,
@@ -59,11 +67,11 @@ pub struct MappingSyncWorker<Block: BlockT, C, BE> {
5967
Arc<crate::EthereumBlockNotificationSinks<crate::EthereumBlockNotification<Block>>>,
6068

6169
/// Tracks block hashes that were `is_new_best` at the time of their import notification,
62-
/// along with their block number for pruning purposes.
70+
/// along with their block number for pruning purposes and optional reorg info.
6371
/// This is used to correctly determine `is_new_best` when syncing blocks, avoiding race
6472
/// conditions where the best hash may have changed between import and sync time.
6573
/// Entries are pruned when blocks become finalized to prevent unbounded growth.
66-
best_at_import: HashMap<Block::Hash, <Block::Header as HeaderT>::Number>,
74+
best_at_import: HashMap<Block::Hash, BestBlockInfo<Block>>,
6775
}
6876

6977
impl<Block: BlockT, C, BE> Unpin for MappingSyncWorker<Block, C, BE> {}
@@ -126,10 +134,33 @@ where
126134
fire = true;
127135
// Track blocks that were `is_new_best` at import time to avoid race
128136
// conditions when determining `is_new_best` at sync time.
129-
// We store the block number to enable pruning of old entries.
137+
// We store the block number to enable pruning of old entries,
138+
// and reorg info if this block became best as part of a reorg.
130139
if notification.is_new_best {
131-
self.best_at_import
132-
.insert(notification.hash, *notification.header.number());
140+
let reorg_info = notification.tree_route.as_ref().map(|tree_route| {
141+
let retracted = tree_route
142+
.retracted()
143+
.iter()
144+
.map(|hash_and_number| hash_and_number.hash)
145+
.collect();
146+
let enacted = tree_route
147+
.enacted()
148+
.iter()
149+
.map(|hash_and_number| hash_and_number.hash)
150+
.collect();
151+
ReorgInfo {
152+
common_ancestor: tree_route.common_block().hash,
153+
retracted,
154+
enacted,
155+
}
156+
});
157+
self.best_at_import.insert(
158+
notification.hash,
159+
BestBlockInfo {
160+
block_number: *notification.header.number(),
161+
reorg_info,
162+
},
163+
);
133164
}
134165
}
135166
Poll::Ready(None) => return Poll::Ready(None),

client/mapping-sync/src/lib.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,26 @@ pub enum SyncStrategy {
3434
pub type EthereumBlockNotificationSinks<T> =
3535
parking_lot::Mutex<Vec<sc_utils::mpsc::TracingUnboundedSender<T>>>;
3636

37-
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
37+
/// Information about a chain reorganization.
38+
///
39+
/// When a reorg occurs, this struct contains the blocks that were removed from
40+
/// the canonical chain (retracted) and the blocks that were added (enacted).
41+
/// The `common_ancestor` is the last block that remains canonical in both
42+
/// the old and new chains.
43+
#[derive(Clone, Debug, Eq, PartialEq)]
44+
pub struct ReorgInfo<Block: BlockT> {
45+
/// The common ancestor block hash between the old and new canonical chains.
46+
pub common_ancestor: Block::Hash,
47+
/// Blocks that were removed from the canonical chain (old fork).
48+
pub retracted: Vec<Block::Hash>,
49+
/// Blocks that were added to the canonical chain (new fork).
50+
pub enacted: Vec<Block::Hash>,
51+
}
52+
53+
#[derive(Clone, Debug, Eq, PartialEq)]
3854
pub struct EthereumBlockNotification<Block: BlockT> {
3955
pub is_new_best: bool,
4056
pub hash: Block::Hash,
57+
/// Optional reorg information. Present when this block became best as part of a reorg.
58+
pub reorg_info: Option<ReorgInfo<Block>>,
4159
}

client/mapping-sync/src/sql/mod.rs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,20 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto
2929
// Frontier
3030
use fp_rpc::EthereumRuntimeRPCApi;
3131

32-
use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy};
32+
use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo, SyncStrategy};
3333

3434
/// Defines the commands for the sync worker.
3535
#[derive(Debug)]
36-
pub enum WorkerCommand {
36+
pub enum WorkerCommand<Block: BlockT> {
3737
/// Resume indexing from the last indexed canon block.
3838
ResumeSync,
3939
/// Index leaves.
4040
IndexLeaves(Vec<H256>),
4141
/// Index the best block known so far via import notifications.
42-
IndexBestBlock(H256),
42+
IndexBestBlock {
43+
block_hash: H256,
44+
reorg_info: Option<ReorgInfo<Block>>,
45+
},
4346
/// Canonicalize the enacted and retracted blocks reported via import notifications.
4447
Canonicalize {
4548
common: H256,
@@ -80,7 +83,7 @@ where
8083
pubsub_notification_sinks: Arc<
8184
EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
8285
>,
83-
) -> tokio::sync::mpsc::Sender<WorkerCommand> {
86+
) -> tokio::sync::mpsc::Sender<WorkerCommand<Block>> {
8487
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
8588
tokio::task::spawn(async move {
8689
while let Some(cmd) = rx.recv().await {
@@ -122,7 +125,10 @@ where
122125
.await;
123126
}
124127
}
125-
WorkerCommand::IndexBestBlock(block_hash) => {
128+
WorkerCommand::IndexBestBlock {
129+
block_hash,
130+
reorg_info,
131+
} => {
126132
index_canonical_block_and_ancestors(
127133
client.clone(),
128134
substrate_backend.clone(),
@@ -135,6 +141,7 @@ where
135141
let _ = sink.unbounded_send(EthereumBlockNotification {
136142
is_new_best: true,
137143
hash: block_hash,
144+
reorg_info: reorg_info.clone(),
138145
});
139146
}
140147
}
@@ -229,7 +236,8 @@ where
229236
notification.is_new_best,
230237
);
231238
if notification.is_new_best {
232-
if let Some(tree_route) = notification.tree_route {
239+
// Extract reorg info from tree_route if present
240+
let reorg_info = if let Some(ref tree_route) = notification.tree_route {
233241
log::debug!(
234242
target: "frontier-sql",
235243
"🔀 Re-org happened at new best {}, proceeding to canonicalize db",
@@ -249,12 +257,23 @@ where
249257
let common = tree_route.common_block().hash;
250258
tx.send(WorkerCommand::Canonicalize {
251259
common,
252-
enacted,
253-
retracted,
260+
enacted: enacted.clone(),
261+
retracted: retracted.clone(),
254262
}).await.ok();
255-
}
256263

257-
tx.send(WorkerCommand::IndexBestBlock(notification.hash)).await.ok();
264+
Some(ReorgInfo {
265+
common_ancestor: common,
266+
retracted,
267+
enacted,
268+
})
269+
} else {
270+
None
271+
};
272+
273+
tx.send(WorkerCommand::IndexBestBlock {
274+
block_hash: notification.hash,
275+
reorg_info,
276+
}).await.ok();
258277
}
259278
}
260279
}

client/rpc/src/eth_pubsub.rs

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::{marker::PhantomData, sync::Arc};
2121
use ethereum::TransactionV3 as EthereumTransaction;
2222
use futures::{future, FutureExt as _, StreamExt as _};
2323
use jsonrpsee::{core::traits::IdProvider, server::PendingSubscriptionSink};
24+
use log::debug;
2425
// Substrate
2526
use sc_client_api::{
2627
backend::{Backend, StorageProvider},
@@ -118,16 +119,21 @@ where
118119
}
119120
}
120121

121-
fn notify_header(
122-
&self,
123-
notification: EthereumBlockNotification<B>,
124-
) -> future::Ready<Option<PubSubResult>> {
125-
let res = if notification.is_new_best {
126-
self.storage_override.current_block(notification.hash)
127-
} else {
128-
None
129-
};
130-
future::ready(res.map(PubSubResult::header))
122+
/// Get headers for enacted blocks during a reorg.
123+
///
124+
/// Per Ethereum spec (https://github.com/ethereum/go-ethereum/wiki/RPC-PUB-SUB#newheads):
125+
/// "When a chain reorganization occurs, this subscription will emit an event
126+
/// containing all new headers (blocks) for the new chain. This means that you
127+
/// may see multiple headers emitted with the same height (block number)."
128+
///
129+
/// Returns headers in ascending order (oldest first).
130+
/// Note: `enacted` from tree_route already includes the new best block.
131+
fn get_enacted_headers(&self, enacted: &[B::Hash]) -> Vec<PubSubResult> {
132+
enacted
133+
.iter()
134+
.filter_map(|hash| self.storage_override.current_block(*hash))
135+
.map(PubSubResult::header)
136+
.collect()
131137
}
132138

133139
fn notify_logs(
@@ -252,10 +258,45 @@ where
252258
let fut = async move {
253259
match kind {
254260
Kind::NewHeads => {
255-
let stream = block_notification_stream
256-
.filter_map(move |notification| pubsub.notify_header(notification));
261+
// Per Ethereum spec, when a reorg occurs, we must emit all headers
262+
// for the new canonical chain. The reorg_info field in the notification
263+
// contains the enacted blocks when a reorg occurred.
264+
let stream = block_notification_stream.filter_map(move |notification| {
265+
if !notification.is_new_best {
266+
return future::ready(None);
267+
}
268+
269+
// Check if this block came from a reorg
270+
let headers = if let Some(ref reorg_info) = notification.reorg_info {
271+
debug!(
272+
target: "eth-pubsub",
273+
"Reorg detected: {} blocks retracted, {} blocks enacted",
274+
reorg_info.retracted.len(),
275+
reorg_info.enacted.len()
276+
);
277+
// Emit all enacted blocks (already includes the new best block)
278+
pubsub.get_enacted_headers(&reorg_info.enacted)
279+
} else {
280+
// Normal case: just emit the new block
281+
if let Some(block) = pubsub.storage_override.current_block(notification.hash) {
282+
vec![PubSubResult::header(block)]
283+
} else {
284+
return future::ready(None);
285+
}
286+
};
287+
288+
if headers.is_empty() {
289+
return future::ready(None);
290+
}
291+
292+
future::ready(Some(headers))
293+
});
294+
295+
// Flatten the Vec<PubSubResult> into individual PubSubResult items
296+
let flat_stream = stream.flat_map(futures::stream::iter);
297+
257298
PendingSubscription::from(pending)
258-
.pipe_from_stream(stream, BoundedVecDeque::new(16))
299+
.pipe_from_stream(flat_stream, BoundedVecDeque::new(16))
259300
.await
260301
}
261302
Kind::Logs => {

0 commit comments

Comments
 (0)