Skip to content

Commit 3069865

Browse files
committed
feat: install payload delay
1 parent 767d457 commit 3069865

File tree

6 files changed

+164
-0
lines changed

6 files changed

+164
-0
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ serde_json = "1"
235235
thiserror = "2"
236236
futures = "0.3"
237237
url = "2.5"
238+
parking_lot = "0.12"
238239

239240
# misc-testing
240241
rstest = "0.18.2"

bin/odyssey/src/main.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use eyre::Context;
3030
use odyssey_node::{
3131
broadcaster::periodic_broadcaster,
3232
chainspec::OdysseyChainSpecParser,
33+
delayed_resolve::{DelayedResolver, MAX_DELAY_INTO_SLOT},
3334
forwarder::forward_raw_transactions,
3435
node::OdysseyNode,
3536
rpc::{EthApiExt, EthApiOverrideServer},
@@ -40,6 +41,7 @@ use reth_node_builder::{engine_tree_config::TreeConfig, EngineNodeLauncher, Node
4041
use reth_optimism_cli::Cli;
4142
use reth_optimism_node::{args::RollupArgs, node::OpAddOnsBuilder};
4243
use reth_provider::{providers::BlockchainProvider2, CanonStateSubscriptions};
44+
use std::time::Duration;
4345
use tracing::{info, warn};
4446

4547
#[global_allocator]
@@ -110,6 +112,18 @@ fn main() {
110112
ctx.modules.merge_configured(walltime.into_rpc())?;
111113
info!(target: "reth::cli", "Walltime configured");
112114

115+
// wrap the getPayloadV3 method in a delay
116+
let engine_module = ctx.auth_module.module_mut().clone();
117+
let delay_into_slot = std::env::var("MAX_PAYLOAD_DELAY")
118+
.ok()
119+
.and_then(|val| val.parse::<u64>().map(Duration::from_millis).ok())
120+
.unwrap_or(MAX_DELAY_INTO_SLOT);
121+
122+
let delayed_payload = DelayedResolver::new(engine_module, delay_into_slot);
123+
delayed_payload.clone().spawn(ctx.provider().canonical_state_stream());
124+
ctx.auth_module.replace_auth_methods(delayed_payload.into_rpc_module())?;
125+
info!(target: "reth::cli", "Configured payload delay");
126+
113127
Ok(())
114128
})
115129
.launch_with_fn(|builder| {

crates/node/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ reth-trie-common.workspace = true
3333
reth-trie-db.workspace = true
3434
reth-network.workspace = true
3535
reth-network-types.workspace = true
36+
reth-chain-state.workspace = true
3637

3738
alloy-consensus.workspace = true
3839
alloy-eips.workspace = true
@@ -50,6 +51,9 @@ tokio.workspace = true
5051
tracing.workspace = true
5152
eyre.workspace = true
5253
jsonrpsee.workspace = true
54+
futures.workspace = true
55+
parking_lot.workspace = true
56+
serde.workspace = true
5357

5458
[lints]
5559
workspace = true

crates/node/src/delayed_resolve.rs

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
//! Helper that delays resolving the payload
2+
3+
use futures::{Stream, StreamExt};
4+
use jsonrpsee::{
5+
core::traits::ToRpcParams,
6+
types::{error::INVALID_PARAMS_CODE, ErrorObject, Params},
7+
MethodsError, RpcModule,
8+
};
9+
use parking_lot::Mutex;
10+
use reth_chain_state::CanonStateNotification;
11+
use serde::de::Error;
12+
use serde_json::value::RawValue;
13+
use std::{
14+
sync::Arc,
15+
time::{Duration, Instant},
16+
};
17+
18+
/// Delay into the slot
19+
pub const MAX_DELAY_INTO_SLOT: Duration = Duration::from_millis(500);
20+
21+
/// The getpayload fn we want to delay
22+
pub const GET_PAYLOAD_V3: &str = "engine_getPayloadV3";
23+
24+
/// A helper that tracks the block clock timestamp and can delay resolving the payload to give the
25+
/// payload builder more time to build a block.
26+
#[derive(Debug, Clone)]
27+
pub struct DelayedResolver {
28+
inner: Arc<DelayedResolverInner>,
29+
}
30+
31+
impl DelayedResolver {
32+
/// Creates a new instance with the engine module and the duration we should target
33+
pub fn new(engine_module: RpcModule<()>, max_delay_into_slot: Duration) -> Self {
34+
Self {
35+
inner: Arc::new(DelayedResolverInner {
36+
last_block_time: Mutex::new(Instant::now()),
37+
engine_module,
38+
max_delay_into_slot,
39+
}),
40+
}
41+
}
42+
43+
/// Listen for new blocks and track the local timestamp.
44+
pub fn spawn<St>(self, mut st: St)
45+
where
46+
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
47+
{
48+
tokio::task::spawn(async move {
49+
while st.next().await.is_some() {
50+
*self.inner.last_block_time.lock() = Instant::now();
51+
}
52+
});
53+
}
54+
55+
async fn call(&self, params: Params<'static>) -> Result<String, MethodsError> {
56+
let last = *self.inner.last_block_time.lock();
57+
let now = Instant::now();
58+
// how far we're into the slot
59+
let offset = now.duration_since(last);
60+
61+
if offset < self.inner.max_delay_into_slot {
62+
// if we received the request before the max delay exceeded we can delay the request to
63+
// give the payload builder more time to build the payload.
64+
let delay = self.inner.max_delay_into_slot - offset;
65+
tokio::time::sleep(delay).await;
66+
}
67+
68+
let params = params
69+
.as_str()
70+
.ok_or_else(|| MethodsError::Parse(serde_json::Error::missing_field("payload id")))?;
71+
72+
self.inner.engine_module.call(GET_PAYLOAD_V3, PayloadParam(params.to_string())).await
73+
}
74+
75+
/// Converts this type into a new [`RpcModule`] that delegates the get payload call.
76+
pub fn into_rpc_module(self) -> RpcModule<()> {
77+
let mut module = RpcModule::new(());
78+
module
79+
.register_async_method(GET_PAYLOAD_V3, move |params, _ctx, _| {
80+
let value = self.clone();
81+
async move {
82+
value.call(params).await.map_err(|err| match err {
83+
MethodsError::JsonRpc(err) => err,
84+
err => ErrorObject::owned(
85+
INVALID_PARAMS_CODE,
86+
format!("invalid payload call: {:?}", err),
87+
None::<()>,
88+
),
89+
})
90+
}
91+
})
92+
.unwrap();
93+
94+
module
95+
}
96+
}
97+
98+
#[derive(Debug)]
99+
struct DelayedResolverInner {
100+
/// Tracks the time when the last block was emitted
101+
last_block_time: Mutex<Instant>,
102+
engine_module: RpcModule<()>,
103+
/// By how much we want to delay getPayload into the slot
104+
max_delay_into_slot: Duration,
105+
}
106+
107+
struct PayloadParam(String);
108+
109+
impl ToRpcParams for PayloadParam {
110+
fn to_rpc_params(self) -> Result<Option<Box<RawValue>>, serde_json::Error> {
111+
RawValue::from_string(self.0).map(Some)
112+
}
113+
}
114+
115+
#[cfg(test)]
116+
mod tests {
117+
use super::*;
118+
use alloy_rpc_types::engine::PayloadId;
119+
120+
#[tokio::test]
121+
async fn test_delayed_forward() {
122+
use jsonrpsee::{core::RpcResult, RpcModule};
123+
124+
let mut module = RpcModule::new(());
125+
module
126+
.register_method::<RpcResult<PayloadId>, _>(GET_PAYLOAD_V3, |params, _, _| {
127+
params.one::<PayloadId>()
128+
})
129+
.unwrap();
130+
131+
let id = PayloadId::default();
132+
133+
let echo: PayloadId = module.call(GET_PAYLOAD_V3, [id]).await.unwrap();
134+
assert_eq!(echo, id);
135+
136+
let delayer = DelayedResolver::new(module, MAX_DELAY_INTO_SLOT).into_rpc_module();
137+
let echo: PayloadId = delayer.call(GET_PAYLOAD_V3, [id]).await.unwrap();
138+
assert_eq!(echo, id);
139+
}
140+
}

crates/node/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
pub mod broadcaster;
1919
pub mod chainspec;
20+
pub mod delayed_resolve;
2021
pub mod evm;
2122
pub mod forwarder;
2223
pub mod node;

0 commit comments

Comments
 (0)