Skip to content

Commit dde2b5a

Browse files
committed
fix: 🐛 extend EthereumBlockNotification with reorg info to allow compliance with Ethereum specs
1 parent 6fb4a31 commit dde2b5a

File tree

5 files changed

+214
-33
lines changed

5 files changed

+214
-33
lines changed

client/mapping-sync/src/kv/mod.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ mod worker;
2222

2323
pub use worker::MappingSyncWorker;
2424

25-
use std::sync::Arc;
25+
use std::{collections::HashMap, sync::Arc};
2626

2727
// Substrate
2828
use sc_client_api::backend::{Backend, StorageProvider};
@@ -36,6 +36,7 @@ use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog};
3636
use fp_rpc::EthereumRuntimeRPCApi;
3737

3838
use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy};
39+
use worker::BestBlockInfo;
3940

4041
pub fn sync_block<Block: BlockT, C: HeaderBackend<Block>>(
4142
storage_override: Arc<dyn StorageOverride<Block>>,
@@ -155,6 +156,7 @@ pub fn sync_one_block<Block: BlockT, C, BE>(
155156
pubsub_notification_sinks: Arc<
156157
EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
157158
>,
159+
best_at_import: &mut HashMap<Block::Hash, BestBlockInfo<Block>>,
158160
) -> Result<bool, String>
159161
where
160162
C: ProvideRuntimeApi<Block>,
@@ -223,14 +225,24 @@ where
223225
.write_current_syncing_tips(current_syncing_tips)?;
224226
}
225227
// Notify on import and remove closed channels.
226-
// Only notify when the node is node in major syncing.
228+
// Only notify when the node is not in major syncing.
227229
let sinks = &mut pubsub_notification_sinks.lock();
228230
sinks.retain(|sink| {
229231
if !sync_oracle.is_major_syncing() {
230232
let hash = operating_header.hash();
231-
let is_new_best = client.info().best_hash == hash;
232-
sink.unbounded_send(EthereumBlockNotification { is_new_best, hash })
233-
.is_ok()
233+
// Use the `is_new_best` status from import time if available.
234+
// This avoids race conditions where the best hash may have changed
235+
// between import and sync time (e.g., during rapid reorgs).
236+
// Fall back to current best hash check for blocks synced during catch-up.
237+
let best_info = best_at_import.remove(&hash);
238+
let is_new_best = best_info.is_some() || client.info().best_hash == hash;
239+
let reorg_info = best_info.and_then(|info| info.reorg_info);
240+
sink.unbounded_send(EthereumBlockNotification {
241+
is_new_best,
242+
hash,
243+
reorg_info,
244+
})
245+
.is_ok()
234246
} else {
235247
// Remove from the pool if in major syncing.
236248
false
@@ -251,6 +263,7 @@ pub fn sync_blocks<Block: BlockT, C, BE>(
251263
pubsub_notification_sinks: Arc<
252264
EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
253265
>,
266+
best_at_import: &mut HashMap<Block::Hash, BestBlockInfo<Block>>,
254267
) -> Result<bool, String>
255268
where
256269
C: ProvideRuntimeApi<Block>,
@@ -271,9 +284,16 @@ where
271284
strategy,
272285
sync_oracle.clone(),
273286
pubsub_notification_sinks.clone(),
287+
best_at_import,
274288
)?;
275289
}
276290

291+
// Prune old entries from best_at_import to prevent unbounded growth.
292+
// Entries for finalized blocks are no longer needed since finalized blocks
293+
// cannot be reorged and their is_new_best status is irrelevant.
294+
let finalized_number = client.info().finalized_number;
295+
best_at_import.retain(|_, info| info.block_number > finalized_number);
296+
277297
Ok(synced_any)
278298
}
279299

client/mapping-sync/src/kv/worker.rs

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// You should have received a copy of the GNU General Public License
1717
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1818

19-
use std::{pin::Pin, sync::Arc, time::Duration};
19+
use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
2020

2121
use futures::{
2222
prelude::*,
@@ -37,7 +37,15 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
3737
use fc_storage::StorageOverride;
3838
use fp_rpc::EthereumRuntimeRPCApi;
3939

40-
use crate::SyncStrategy;
40+
use crate::{ReorgInfo, SyncStrategy};
41+
42+
/// Information tracked at import time for a block that was `is_new_best`.
43+
pub struct BestBlockInfo<Block: BlockT> {
44+
/// The block number (for pruning purposes).
45+
pub block_number: <Block::Header as HeaderT>::Number,
46+
/// Reorg info if this block became best as part of a reorganization.
47+
pub reorg_info: Option<ReorgInfo<Block>>,
48+
}
4149

4250
pub struct MappingSyncWorker<Block: BlockT, C, BE> {
4351
import_notifications: ImportNotifications<Block>,
@@ -57,6 +65,13 @@ pub struct MappingSyncWorker<Block: BlockT, C, BE> {
5765
sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
5866
pubsub_notification_sinks:
5967
Arc<crate::EthereumBlockNotificationSinks<crate::EthereumBlockNotification<Block>>>,
68+
69+
/// Tracks block hashes that were `is_new_best` at the time of their import notification,
70+
/// along with their block number for pruning purposes and optional reorg info.
71+
/// This is used to correctly determine `is_new_best` when syncing blocks, avoiding race
72+
/// conditions where the best hash may have changed between import and sync time.
73+
/// Entries are pruned when blocks become finalized to prevent unbounded growth.
74+
best_at_import: HashMap<Block::Hash, BestBlockInfo<Block>>,
6075
}
6176

6277
impl<Block: BlockT, C, BE> Unpin for MappingSyncWorker<Block, C, BE> {}
@@ -94,6 +109,7 @@ impl<Block: BlockT, C, BE> MappingSyncWorker<Block, C, BE> {
94109

95110
sync_oracle,
96111
pubsub_notification_sinks,
112+
best_at_import: HashMap::new(),
97113
}
98114
}
99115
}
@@ -114,8 +130,42 @@ where
114130
loop {
115131
match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) {
116132
Poll::Pending => break,
117-
Poll::Ready(Some(_)) => {
133+
Poll::Ready(Some(notification)) => {
118134
fire = true;
135+
// Track blocks that were `is_new_best` at import time to avoid race
136+
// conditions when determining `is_new_best` at sync time.
137+
// We store the block number to enable pruning of old entries,
138+
// and reorg info if this block became best as part of a reorg.
139+
if notification.is_new_best {
140+
let reorg_info = notification.tree_route.as_ref().map(|tree_route| {
141+
let retracted = tree_route
142+
.retracted()
143+
.iter()
144+
.map(|hash_and_number| hash_and_number.hash)
145+
.collect();
146+
// tree_route.enacted() returns blocks from common ancestor (exclusive)
147+
// to new best (EXCLUSIVE). We need to include the new best block
148+
// (notification.hash) to get the complete enacted list.
149+
let mut enacted: Vec<_> = tree_route
150+
.enacted()
151+
.iter()
152+
.map(|hash_and_number| hash_and_number.hash)
153+
.collect();
154+
enacted.push(notification.hash);
155+
ReorgInfo {
156+
common_ancestor: tree_route.common_block().hash,
157+
retracted,
158+
enacted,
159+
}
160+
});
161+
self.best_at_import.insert(
162+
notification.hash,
163+
BestBlockInfo {
164+
block_number: *notification.header.number(),
165+
reorg_info,
166+
},
167+
);
168+
}
119169
}
120170
Poll::Ready(None) => return Poll::Ready(None),
121171
}
@@ -138,7 +188,12 @@ where
138188
if fire {
139189
self.inner_delay = None;
140190

141-
match crate::kv::sync_blocks(
191+
// Temporarily take ownership of best_at_import to avoid borrow checker issues
192+
// (we can't have both an immutable borrow of self.client and a mutable borrow
193+
// of self.best_at_import at the same time)
194+
let mut best_at_import = std::mem::take(&mut self.best_at_import);
195+
196+
let result = crate::kv::sync_blocks(
142197
self.client.as_ref(),
143198
self.substrate_backend.as_ref(),
144199
self.storage_override.clone(),
@@ -148,7 +203,13 @@ where
148203
self.strategy,
149204
self.sync_oracle.clone(),
150205
self.pubsub_notification_sinks.clone(),
151-
) {
206+
&mut best_at_import,
207+
);
208+
209+
// Restore the best_at_import set
210+
self.best_at_import = best_at_import;
211+
212+
match result {
152213
Ok(have_next) => {
153214
self.have_next = have_next;
154215
Poll::Ready(Some(()))

client/mapping-sync/src/lib.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,26 @@ pub enum SyncStrategy {
3434
pub type EthereumBlockNotificationSinks<T> =
3535
parking_lot::Mutex<Vec<sc_utils::mpsc::TracingUnboundedSender<T>>>;
3636

37-
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
37+
/// Information about a chain reorganization.
38+
///
39+
/// When a reorg occurs, this struct contains the blocks that were removed from
40+
/// the canonical chain (retracted) and the blocks that were added (enacted).
41+
/// The `common_ancestor` is the last block that remains canonical in both
42+
/// the old and new chains.
43+
#[derive(Clone, Debug, Eq, PartialEq)]
44+
pub struct ReorgInfo<Block: BlockT> {
45+
/// The common ancestor block hash between the old and new canonical chains.
46+
pub common_ancestor: Block::Hash,
47+
/// Blocks that were removed from the canonical chain (old fork).
48+
pub retracted: Vec<Block::Hash>,
49+
/// Blocks that were added to the canonical chain (new fork).
50+
pub enacted: Vec<Block::Hash>,
51+
}
52+
53+
#[derive(Clone, Debug, Eq, PartialEq)]
3854
pub struct EthereumBlockNotification<Block: BlockT> {
3955
pub is_new_best: bool,
4056
pub hash: Block::Hash,
57+
/// Optional reorg information. Present when this block became best as part of a reorg.
58+
pub reorg_info: Option<ReorgInfo<Block>>,
4159
}

client/mapping-sync/src/sql/mod.rs

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,15 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto
2929
// Frontier
3030
use fp_rpc::EthereumRuntimeRPCApi;
3131

32-
use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy};
32+
use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo, SyncStrategy};
33+
34+
/// Reorg information passed between commands.
35+
#[derive(Debug, Clone)]
36+
pub struct ReorgData {
37+
pub common_ancestor: H256,
38+
pub enacted: Vec<H256>,
39+
pub retracted: Vec<H256>,
40+
}
3341

3442
/// Defines the commands for the sync worker.
3543
#[derive(Debug)]
@@ -39,7 +47,12 @@ pub enum WorkerCommand {
3947
/// Index leaves.
4048
IndexLeaves(Vec<H256>),
4149
/// Index the best block known so far via import notifications.
42-
IndexBestBlock(H256),
50+
/// When `reorg_data` is Some, emits a pubsub notification with reorg info after indexing.
51+
/// When `reorg_data` is None, emits a simple new block notification.
52+
IndexBestBlock {
53+
block_hash: H256,
54+
reorg_data: Option<ReorgData>,
55+
},
4356
/// Canonicalize the enacted and retracted blocks reported via import notifications.
4457
Canonicalize {
4558
common: H256,
@@ -122,19 +135,28 @@ where
122135
.await;
123136
}
124137
}
125-
WorkerCommand::IndexBestBlock(block_hash) => {
138+
WorkerCommand::IndexBestBlock {
139+
block_hash,
140+
reorg_data,
141+
} => {
126142
index_canonical_block_and_ancestors(
127143
client.clone(),
128144
substrate_backend.clone(),
129145
indexer_backend.clone(),
130146
block_hash,
131147
)
132148
.await;
149+
// Emit notification after indexing so blocks are available via storage_override
133150
let sinks = &mut pubsub_notification_sinks.lock();
134151
for sink in sinks.iter() {
135152
let _ = sink.unbounded_send(EthereumBlockNotification {
136153
is_new_best: true,
137154
hash: block_hash,
155+
reorg_info: reorg_data.as_ref().map(|data| ReorgInfo {
156+
common_ancestor: data.common_ancestor,
157+
retracted: data.retracted.clone(),
158+
enacted: data.enacted.clone(),
159+
}),
138160
});
139161
}
140162
}
@@ -145,6 +167,7 @@ where
145167
} => {
146168
canonicalize_blocks(indexer_backend.clone(), common, enacted, retracted)
147169
.await;
170+
// Notification is emitted by IndexBestBlock after indexing completes
148171
}
149172
WorkerCommand::CheckIndexedBlocks => {
150173
// Fix any indexed blocks that did not have their logs indexed
@@ -229,7 +252,7 @@ where
229252
notification.is_new_best,
230253
);
231254
if notification.is_new_best {
232-
if let Some(tree_route) = notification.tree_route {
255+
let reorg_data = if let Some(ref tree_route) = notification.tree_route {
233256
log::debug!(
234257
target: "frontier-sql",
235258
"🔀 Re-org happened at new best {}, proceeding to canonicalize db",
@@ -240,21 +263,38 @@ where
240263
.iter()
241264
.map(|hash_and_number| hash_and_number.hash)
242265
.collect::<Vec<_>>();
243-
let enacted = tree_route
266+
// tree_route.enacted() returns blocks from common ancestor (exclusive)
267+
// to new best (exclusive). We need to include the new best block
268+
// (notification.hash) to get the complete enacted list.
269+
let mut enacted: Vec<_> = tree_route
244270
.enacted()
245271
.iter()
246272
.map(|hash_and_number| hash_and_number.hash)
247-
.collect::<Vec<_>>();
273+
.collect();
274+
enacted.push(notification.hash);
248275

249276
let common = tree_route.common_block().hash;
250277
tx.send(WorkerCommand::Canonicalize {
251278
common,
279+
enacted: enacted.clone(),
280+
retracted: retracted.clone(),
281+
}).await.ok();
282+
Some(ReorgData {
283+
common_ancestor: common,
252284
enacted,
253285
retracted,
254-
}).await.ok();
255-
}
286+
})
287+
} else {
288+
None
289+
};
256290

257-
tx.send(WorkerCommand::IndexBestBlock(notification.hash)).await.ok();
291+
// Index the best block and emit notification after indexing completes.
292+
// This ensures blocks are available via storage_override when the
293+
// notification is processed.
294+
tx.send(WorkerCommand::IndexBestBlock {
295+
block_hash: notification.hash,
296+
reorg_data,
297+
}).await.ok();
258298
}
259299
}
260300
}

0 commit comments

Comments
 (0)