mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(engine-util): reorg interceptor (#10173)
This commit is contained in:
12
Cargo.lock
generated
12
Cargo.lock
generated
@ -7034,14 +7034,25 @@ version = "1.0.4"
|
||||
dependencies = [
|
||||
"eyre",
|
||||
"futures",
|
||||
"itertools 0.13.0",
|
||||
"pin-project",
|
||||
"reth-beacon-consensus",
|
||||
"reth-engine-primitives",
|
||||
"reth-errors",
|
||||
"reth-ethereum-forks",
|
||||
"reth-evm",
|
||||
"reth-fs-util",
|
||||
"reth-payload-validator",
|
||||
"reth-primitives",
|
||||
"reth-provider",
|
||||
"reth-revm",
|
||||
"reth-rpc",
|
||||
"reth-rpc-types",
|
||||
"reth-rpc-types-compat",
|
||||
"revm-primitives",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
@ -7689,6 +7700,7 @@ dependencies = [
|
||||
"reth-node-events",
|
||||
"reth-node-metrics",
|
||||
"reth-payload-builder",
|
||||
"reth-payload-validator",
|
||||
"reth-primitives",
|
||||
"reth-provider",
|
||||
"reth-prune",
|
||||
|
||||
3
book/cli/reth/node.md
vendored
3
book/cli/reth/node.md
vendored
@ -482,6 +482,9 @@ Debug:
|
||||
--debug.skip-new-payload <SKIP_NEW_PAYLOAD>
|
||||
If provided, the engine will skip `n` consecutive new payloads
|
||||
|
||||
--debug.reorg-frequency <REORG_FREQUENCY>
|
||||
If provided, the chain will be reorged at specified frequency
|
||||
|
||||
--debug.engine-api-store <PATH>
|
||||
The path to store engine API messages at. If specified, all of the intercepted engine API messages will be written to specified location
|
||||
|
||||
|
||||
@ -12,31 +12,41 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
# reth
|
||||
reth-primitives.workspace = true
|
||||
reth-errors.workspace = true
|
||||
reth-fs-util.workspace = true
|
||||
reth-rpc.workspace = true
|
||||
reth-rpc-types.workspace = true
|
||||
reth-rpc-types-compat.workspace = true
|
||||
reth-engine-primitives.workspace = true
|
||||
reth-beacon-consensus.workspace = true
|
||||
reth-payload-validator.workspace = true
|
||||
reth-evm.workspace = true
|
||||
reth-revm.workspace = true
|
||||
reth-provider.workspace = true
|
||||
reth-ethereum-forks.workspace = true
|
||||
revm-primitives.workspace = true
|
||||
|
||||
# async
|
||||
tokio = { workspace = true, default-features = false }
|
||||
tokio-util.workspace = true
|
||||
pin-project.workspace = true
|
||||
|
||||
# misc
|
||||
eyre.workspace = true
|
||||
futures.workspace = true
|
||||
|
||||
# io
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
# misc
|
||||
eyre.workspace = true
|
||||
itertools.workspace = true
|
||||
|
||||
# tracing
|
||||
tracing.workspace = true
|
||||
|
||||
# async
|
||||
futures.workspace = true
|
||||
|
||||
[features]
|
||||
optimism = [
|
||||
"reth-rpc/optimism",
|
||||
"reth-beacon-consensus/optimism",
|
||||
"reth-ethereum-forks/optimism"
|
||||
]
|
||||
|
||||
@ -134,10 +134,10 @@ impl<S> EngineStoreStream<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Engine, S> Stream for EngineStoreStream<S>
|
||||
impl<S, Engine> Stream for EngineStoreStream<S>
|
||||
where
|
||||
Engine: EngineTypes,
|
||||
S: Stream<Item = BeaconEngineMessage<Engine>>,
|
||||
Engine: EngineTypes,
|
||||
{
|
||||
type Item = S::Item;
|
||||
|
||||
@ -146,7 +146,7 @@ where
|
||||
let next = ready!(this.stream.poll_next_unpin(cx));
|
||||
if let Some(msg) = &next {
|
||||
if let Err(error) = this.store.on_message(msg, SystemTime::now()) {
|
||||
error!(target: "engine::intercept", ?msg, %error, "Error handling Engine API message");
|
||||
error!(target: "engine::stream::store", ?msg, %error, "Error handling Engine API message");
|
||||
}
|
||||
}
|
||||
Poll::Ready(next)
|
||||
|
||||
@ -3,6 +3,7 @@
|
||||
use futures::Stream;
|
||||
use reth_beacon_consensus::BeaconEngineMessage;
|
||||
use reth_engine_primitives::EngineTypes;
|
||||
use reth_payload_validator::ExecutionPayloadValidator;
|
||||
use std::path::PathBuf;
|
||||
use tokio_util::either::Either;
|
||||
|
||||
@ -15,6 +16,9 @@ use skip_fcu::EngineSkipFcu;
|
||||
pub mod skip_new_payload;
|
||||
use skip_new_payload::EngineSkipNewPayload;
|
||||
|
||||
pub mod reorg;
|
||||
use reorg::EngineReorg;
|
||||
|
||||
/// The collection of stream extensions for engine API message stream.
|
||||
pub trait EngineMessageStreamExt<Engine: EngineTypes>:
|
||||
Stream<Item = BeaconEngineMessage<Engine>>
|
||||
@ -89,6 +93,45 @@ pub trait EngineMessageStreamExt<Engine: EngineTypes>:
|
||||
Either::Right(self)
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates reorgs with specified frequency.
|
||||
fn reorg<Provider, Evm>(
|
||||
self,
|
||||
provider: Provider,
|
||||
evm_config: Evm,
|
||||
payload_validator: ExecutionPayloadValidator,
|
||||
frequency: usize,
|
||||
) -> EngineReorg<Self, Engine, Provider, Evm>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
EngineReorg::new(self, provider, evm_config, payload_validator, frequency)
|
||||
}
|
||||
|
||||
/// If frequency is [Some], returns the stream that creates reorgs with
|
||||
/// specified frequency. Otherwise, returns `Self`.
|
||||
fn maybe_reorg<Provider, Evm>(
|
||||
self,
|
||||
provider: Provider,
|
||||
evm_config: Evm,
|
||||
payload_validator: ExecutionPayloadValidator,
|
||||
frequency: Option<usize>,
|
||||
) -> Either<EngineReorg<Self, Engine, Provider, Evm>, Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
if let Some(frequency) = frequency {
|
||||
Either::Left(reorg::EngineReorg::new(
|
||||
self,
|
||||
provider,
|
||||
evm_config,
|
||||
payload_validator,
|
||||
frequency,
|
||||
))
|
||||
} else {
|
||||
Either::Right(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Engine, T> EngineMessageStreamExt<Engine> for T
|
||||
|
||||
408
crates/engine/util/src/reorg.rs
Normal file
408
crates/engine/util/src/reorg.rs
Normal file
@ -0,0 +1,408 @@
|
||||
//! Stream wrapper that simulates reorgs.
|
||||
|
||||
use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
|
||||
use itertools::Either;
|
||||
use reth_beacon_consensus::{BeaconEngineMessage, BeaconOnNewPayloadError, OnForkChoiceUpdated};
|
||||
use reth_engine_primitives::EngineTypes;
|
||||
use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
|
||||
use reth_ethereum_forks::EthereumHardforks;
|
||||
use reth_evm::{system_calls::apply_beacon_root_contract_call, ConfigureEvm};
|
||||
use reth_payload_validator::ExecutionPayloadValidator;
|
||||
use reth_primitives::{
|
||||
eip4844::calculate_excess_blob_gas, proofs, Block, Header, Receipt, Receipts, U256,
|
||||
};
|
||||
use reth_provider::{BlockReader, ExecutionOutcome, ProviderError, StateProviderFactory};
|
||||
use reth_revm::{
|
||||
database::StateProviderDatabase,
|
||||
db::{states::bundle_state::BundleRetention, State},
|
||||
state_change::post_block_withdrawals_balance_increments,
|
||||
DatabaseCommit,
|
||||
};
|
||||
use reth_rpc_types::{
|
||||
engine::{CancunPayloadFields, ForkchoiceState, PayloadStatus},
|
||||
ExecutionPayload,
|
||||
};
|
||||
use reth_rpc_types_compat::engine::payload::block_to_payload;
|
||||
use revm_primitives::{BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg};
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
pin::Pin,
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum EngineReorgState<Engine: EngineTypes> {
|
||||
Forward,
|
||||
Reorg { queue: VecDeque<BeaconEngineMessage<Engine>> },
|
||||
}
|
||||
|
||||
type EngineReorgResponse = Result<
|
||||
Either<Result<PayloadStatus, BeaconOnNewPayloadError>, RethResult<OnForkChoiceUpdated>>,
|
||||
oneshot::error::RecvError,
|
||||
>;
|
||||
|
||||
/// Engine API stream wrapper that simulates reorgs with specified frequency.
|
||||
#[derive(Debug)]
|
||||
#[pin_project::pin_project]
|
||||
pub struct EngineReorg<S, Engine: EngineTypes, Provider, Evm> {
|
||||
/// Underlying stream
|
||||
#[pin]
|
||||
stream: S,
|
||||
/// Database provider.
|
||||
provider: Provider,
|
||||
/// Evm configuration.
|
||||
evm_config: Evm,
|
||||
/// Payload validator.
|
||||
payload_validator: ExecutionPayloadValidator,
|
||||
/// The frequency of reorgs.
|
||||
frequency: usize,
|
||||
/// The number of forwarded forkchoice states.
|
||||
/// This is reset after a reorg.
|
||||
forkchoice_states_forwarded: usize,
|
||||
/// Current state of the stream.
|
||||
state: EngineReorgState<Engine>,
|
||||
/// Last forkchoice state.
|
||||
last_forkchoice_state: Option<ForkchoiceState>,
|
||||
/// Pending engine responses to reorg messages.
|
||||
reorg_responses: FuturesUnordered<BoxFuture<'static, EngineReorgResponse>>,
|
||||
}
|
||||
|
||||
impl<S, Engine: EngineTypes, Provider, Evm> EngineReorg<S, Engine, Provider, Evm> {
|
||||
/// Creates new [`EngineReorg`] stream wrapper.
|
||||
pub fn new(
|
||||
stream: S,
|
||||
provider: Provider,
|
||||
evm_config: Evm,
|
||||
payload_validator: ExecutionPayloadValidator,
|
||||
frequency: usize,
|
||||
) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
provider,
|
||||
evm_config,
|
||||
payload_validator,
|
||||
frequency,
|
||||
state: EngineReorgState::Forward,
|
||||
forkchoice_states_forwarded: 0,
|
||||
last_forkchoice_state: None,
|
||||
reorg_responses: FuturesUnordered::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Engine, Provider, Evm> Stream for EngineReorg<S, Engine, Provider, Evm>
|
||||
where
|
||||
S: Stream<Item = BeaconEngineMessage<Engine>>,
|
||||
Engine: EngineTypes,
|
||||
Provider: BlockReader + StateProviderFactory,
|
||||
Evm: ConfigureEvm,
|
||||
{
|
||||
type Item = S::Item;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.project();
|
||||
|
||||
loop {
|
||||
if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) {
|
||||
match response {
|
||||
Ok(Either::Left(Ok(payload_status))) => {
|
||||
debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload");
|
||||
}
|
||||
Ok(Either::Left(Err(payload_error))) => {
|
||||
error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload");
|
||||
}
|
||||
Ok(Either::Right(Ok(fcu_status))) => {
|
||||
debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update");
|
||||
}
|
||||
Ok(Either::Right(Err(fcu_error))) => {
|
||||
error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update");
|
||||
}
|
||||
Err(_) => {}
|
||||
};
|
||||
continue
|
||||
}
|
||||
|
||||
if let EngineReorgState::Reorg { queue } = &mut this.state {
|
||||
match queue.pop_front() {
|
||||
Some(msg) => return Poll::Ready(Some(msg)),
|
||||
None => {
|
||||
*this.forkchoice_states_forwarded = 0;
|
||||
*this.state = EngineReorgState::Forward;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let next = ready!(this.stream.poll_next_unpin(cx));
|
||||
let item = match next {
|
||||
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }) => {
|
||||
if this.forkchoice_states_forwarded > this.frequency {
|
||||
if let Some(last_forkchoice_state) = this
|
||||
.last_forkchoice_state
|
||||
// Only enter reorg state if new payload attaches to current head.
|
||||
.filter(|state| state.head_block_hash == payload.parent_hash())
|
||||
{
|
||||
// Enter the reorg state.
|
||||
// The current payload will be immediately forwarded by being in front
|
||||
// of the queue. Then we attempt to reorg the current head by generating
|
||||
// a payload that attaches to the head's parent and is based on the
|
||||
// non-conflicting transactions (txs from block `n + 1` that are valid
|
||||
// at block `n` according to consensus checks) from the current payload
|
||||
// as well as the corresponding forkchoice state. We will rely on CL to
|
||||
// reorg us back to canonical chain.
|
||||
// TODO: This is an expensive blocking operation, ideally it's spawned
|
||||
// as a task so that the stream could yield the control back.
|
||||
let (reorg_payload, reorg_cancun_fields) = match create_reorg_head(
|
||||
this.provider,
|
||||
this.evm_config,
|
||||
this.payload_validator,
|
||||
payload.clone(),
|
||||
cancun_fields.clone(),
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(error) => {
|
||||
error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
|
||||
// Forward the payload and attempt to create reorg on top of the
|
||||
// next one
|
||||
return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
|
||||
payload,
|
||||
cancun_fields,
|
||||
tx,
|
||||
}))
|
||||
}
|
||||
};
|
||||
let reorg_forkchoice_state = ForkchoiceState {
|
||||
finalized_block_hash: last_forkchoice_state.finalized_block_hash,
|
||||
safe_block_hash: last_forkchoice_state.safe_block_hash,
|
||||
head_block_hash: reorg_payload.block_hash(),
|
||||
};
|
||||
|
||||
let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
|
||||
let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
|
||||
this.reorg_responses.extend([
|
||||
Box::pin(reorg_payload_rx.map_ok(Either::Left))
|
||||
as BoxFuture<'static, EngineReorgResponse>,
|
||||
Box::pin(reorg_fcu_rx.map_ok(Either::Right))
|
||||
as BoxFuture<'static, EngineReorgResponse>,
|
||||
]);
|
||||
|
||||
*this.state = EngineReorgState::Reorg {
|
||||
queue: VecDeque::from([
|
||||
// Current payload
|
||||
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx },
|
||||
// Reorg payload
|
||||
BeaconEngineMessage::NewPayload {
|
||||
payload: reorg_payload,
|
||||
cancun_fields: reorg_cancun_fields,
|
||||
tx: reorg_payload_tx,
|
||||
},
|
||||
// Reorg forkchoice state
|
||||
BeaconEngineMessage::ForkchoiceUpdated {
|
||||
state: reorg_forkchoice_state,
|
||||
payload_attrs: None,
|
||||
tx: reorg_fcu_tx,
|
||||
},
|
||||
]),
|
||||
};
|
||||
continue
|
||||
}
|
||||
}
|
||||
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx })
|
||||
}
|
||||
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) => {
|
||||
// Record last forkchoice state forwarded to the engine.
|
||||
// We do not care if it's valid since engine should be able to handle
|
||||
// reorgs that rely on invalid forkchoice state.
|
||||
*this.last_forkchoice_state = Some(state);
|
||||
*this.forkchoice_states_forwarded += 1;
|
||||
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
|
||||
}
|
||||
item => item,
|
||||
};
|
||||
return Poll::Ready(item)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn create_reorg_head<Provider, Evm>(
|
||||
provider: &Provider,
|
||||
evm_config: &Evm,
|
||||
payload_validator: &ExecutionPayloadValidator,
|
||||
next_payload: ExecutionPayload,
|
||||
next_cancun_fields: Option<CancunPayloadFields>,
|
||||
) -> RethResult<(ExecutionPayload, Option<CancunPayloadFields>)>
|
||||
where
|
||||
Provider: BlockReader + StateProviderFactory,
|
||||
Evm: ConfigureEvm,
|
||||
{
|
||||
let chain_spec = payload_validator.chain_spec();
|
||||
|
||||
// Ensure next payload is valid.
|
||||
let next_block = payload_validator
|
||||
.ensure_well_formed_payload(next_payload, next_cancun_fields.into())
|
||||
.map_err(RethError::msg)?;
|
||||
|
||||
// Fetch reorg target block and its parent
|
||||
let reorg_target = provider
|
||||
.block_by_hash(next_block.parent_hash)?
|
||||
.ok_or_else(|| ProviderError::HeaderNotFound(next_block.parent_hash.into()))?;
|
||||
let reorg_target_parent = provider
|
||||
.block_by_hash(reorg_target.parent_hash)?
|
||||
.ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.parent_hash.into()))?;
|
||||
|
||||
// Configure state
|
||||
let state_provider = provider.state_by_block_hash(reorg_target.parent_hash)?;
|
||||
let mut state = State::builder()
|
||||
.with_database_ref(StateProviderDatabase::new(&state_provider))
|
||||
.with_bundle_update()
|
||||
.build();
|
||||
|
||||
// Configure environments
|
||||
let mut cfg = CfgEnvWithHandlerCfg::new(Default::default(), Default::default());
|
||||
let mut block_env = BlockEnv::default();
|
||||
evm_config.fill_cfg_and_block_env(
|
||||
&mut cfg,
|
||||
&mut block_env,
|
||||
chain_spec,
|
||||
&reorg_target.header,
|
||||
U256::MAX,
|
||||
);
|
||||
let env = EnvWithHandlerCfg::new_with_cfg_env(cfg, block_env, Default::default());
|
||||
let mut evm = evm_config.evm_with_env(&mut state, env);
|
||||
|
||||
// apply eip-4788 pre block contract call
|
||||
apply_beacon_root_contract_call(
|
||||
evm_config,
|
||||
chain_spec,
|
||||
reorg_target.timestamp,
|
||||
reorg_target.number,
|
||||
reorg_target.parent_beacon_block_root,
|
||||
&mut evm,
|
||||
)?;
|
||||
|
||||
let mut cumulative_gas_used = 0;
|
||||
let mut sum_blob_gas_used = 0;
|
||||
let mut transactions = Vec::new();
|
||||
let mut receipts = Vec::new();
|
||||
let mut versioned_hashes = Vec::new();
|
||||
for tx in next_block.body {
|
||||
// ensure we still have capacity for this transaction
|
||||
if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit {
|
||||
continue
|
||||
}
|
||||
|
||||
// Configure the environment for the block.
|
||||
let tx_recovered = tx.clone().try_into_ecrecovered().map_err(|_| {
|
||||
BlockExecutionError::Validation(BlockValidationError::SenderRecoveryError)
|
||||
})?;
|
||||
evm_config.fill_tx_env(evm.tx_mut(), &tx_recovered, tx_recovered.signer());
|
||||
let exec_result = match evm.transact() {
|
||||
Ok(result) => result,
|
||||
error @ Err(EVMError::Transaction(_) | EVMError::Header(_)) => {
|
||||
trace!(target: "engine::stream::reorg", hash = %tx.hash(), ?error, "Error executing transaction from next block");
|
||||
continue
|
||||
}
|
||||
// Treat error as fatal
|
||||
Err(error) => {
|
||||
return Err(RethError::Execution(BlockExecutionError::Validation(
|
||||
BlockValidationError::EVM { hash: tx.hash, error: Box::new(error) },
|
||||
)))
|
||||
}
|
||||
};
|
||||
evm.db_mut().commit(exec_result.state);
|
||||
|
||||
if let Some(blob_tx) = tx.transaction.as_eip4844() {
|
||||
sum_blob_gas_used += blob_tx.blob_gas();
|
||||
versioned_hashes.extend(blob_tx.blob_versioned_hashes.clone());
|
||||
}
|
||||
|
||||
cumulative_gas_used += exec_result.result.gas_used();
|
||||
#[allow(clippy::needless_update)] // side-effect of optimism fields
|
||||
receipts.push(Some(Receipt {
|
||||
tx_type: tx.tx_type(),
|
||||
success: exec_result.result.is_success(),
|
||||
cumulative_gas_used,
|
||||
logs: exec_result.result.into_logs().into_iter().map(Into::into).collect(),
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
// append transaction to the list of executed transactions
|
||||
transactions.push(tx);
|
||||
}
|
||||
drop(evm);
|
||||
|
||||
if let Some(withdrawals) = &reorg_target.withdrawals {
|
||||
state.increment_balances(post_block_withdrawals_balance_increments(
|
||||
chain_spec,
|
||||
reorg_target.timestamp,
|
||||
withdrawals,
|
||||
))?;
|
||||
}
|
||||
|
||||
// merge all transitions into bundle state, this would apply the withdrawal balance changes
|
||||
// and 4788 contract call
|
||||
state.merge_transitions(BundleRetention::PlainState);
|
||||
|
||||
let outcome = ExecutionOutcome::new(
|
||||
state.take_bundle(),
|
||||
Receipts::from(vec![receipts]),
|
||||
reorg_target.number,
|
||||
Default::default(),
|
||||
);
|
||||
|
||||
let (blob_gas_used, excess_blob_gas) =
|
||||
if chain_spec.is_cancun_active_at_timestamp(reorg_target.timestamp) {
|
||||
(
|
||||
Some(sum_blob_gas_used),
|
||||
Some(calculate_excess_blob_gas(
|
||||
reorg_target_parent.excess_blob_gas.unwrap_or_default(),
|
||||
reorg_target_parent.blob_gas_used.unwrap_or_default(),
|
||||
)),
|
||||
)
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
let reorg_block = Block {
|
||||
header: Header {
|
||||
// Set same fields as the reorg target
|
||||
parent_hash: reorg_target.header.parent_hash,
|
||||
ommers_hash: reorg_target.header.ommers_hash,
|
||||
beneficiary: reorg_target.header.beneficiary,
|
||||
difficulty: reorg_target.header.difficulty,
|
||||
number: reorg_target.header.number,
|
||||
gas_limit: reorg_target.header.gas_limit,
|
||||
timestamp: reorg_target.header.timestamp,
|
||||
extra_data: reorg_target.header.extra_data,
|
||||
mix_hash: reorg_target.header.mix_hash,
|
||||
nonce: reorg_target.header.nonce,
|
||||
base_fee_per_gas: reorg_target.header.base_fee_per_gas,
|
||||
parent_beacon_block_root: reorg_target.header.parent_beacon_block_root,
|
||||
withdrawals_root: reorg_target.header.withdrawals_root,
|
||||
|
||||
// Compute or add new fields
|
||||
transactions_root: proofs::calculate_transaction_root(&transactions),
|
||||
receipts_root: outcome.receipts_root_slow(reorg_target.header.number).unwrap(),
|
||||
logs_bloom: outcome.block_logs_bloom(reorg_target.header.number).unwrap(),
|
||||
requests_root: None, // TODO(prague)
|
||||
gas_used: cumulative_gas_used,
|
||||
blob_gas_used,
|
||||
excess_blob_gas,
|
||||
state_root: state_provider.state_root(outcome.state())?,
|
||||
},
|
||||
body: transactions,
|
||||
ommers: reorg_target.ommers,
|
||||
withdrawals: reorg_target.withdrawals,
|
||||
requests: None, // TODO(prague)
|
||||
}
|
||||
.seal_slow();
|
||||
|
||||
Ok((
|
||||
block_to_payload(reorg_block),
|
||||
reorg_target
|
||||
.header
|
||||
.parent_beacon_block_root
|
||||
.map(|root| CancunPayloadFields { parent_beacon_block_root: root, versioned_hashes }),
|
||||
))
|
||||
}
|
||||
@ -32,10 +32,10 @@ impl<S> EngineSkipFcu<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Engine, S> Stream for EngineSkipFcu<S>
|
||||
impl<S, Engine> Stream for EngineSkipFcu<S>
|
||||
where
|
||||
Engine: EngineTypes,
|
||||
S: Stream<Item = BeaconEngineMessage<Engine>>,
|
||||
Engine: EngineTypes,
|
||||
{
|
||||
type Item = S::Item;
|
||||
|
||||
@ -48,7 +48,7 @@ where
|
||||
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) => {
|
||||
if this.skipped < this.threshold {
|
||||
*this.skipped += 1;
|
||||
tracing::warn!(target: "engine::intercept", ?state, ?payload_attrs, threshold=this.threshold, skipped=this.skipped, "Skipping FCU");
|
||||
tracing::warn!(target: "engine::stream::skip_fcu", ?state, ?payload_attrs, threshold=this.threshold, skipped=this.skipped, "Skipping FCU");
|
||||
let _ = tx.send(Ok(OnForkChoiceUpdated::syncing()));
|
||||
continue
|
||||
} else {
|
||||
|
||||
@ -28,10 +28,10 @@ impl<S> EngineSkipNewPayload<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Engine, S> Stream for EngineSkipNewPayload<S>
|
||||
impl<S, Engine> Stream for EngineSkipNewPayload<S>
|
||||
where
|
||||
Engine: EngineTypes,
|
||||
S: Stream<Item = BeaconEngineMessage<Engine>>,
|
||||
Engine: EngineTypes,
|
||||
{
|
||||
type Item = S::Item;
|
||||
|
||||
@ -45,7 +45,7 @@ where
|
||||
if this.skipped < this.threshold {
|
||||
*this.skipped += 1;
|
||||
tracing::warn!(
|
||||
target: "engine::intercept",
|
||||
target: "engine::stream::skip_new_payload",
|
||||
block_number = payload.block_number(),
|
||||
block_hash = %payload.block_hash(),
|
||||
?cancun_fields,
|
||||
|
||||
@ -50,6 +50,7 @@ reth-engine-util.workspace = true
|
||||
reth-cli-util.workspace = true
|
||||
reth-rpc-eth-types.workspace = true
|
||||
reth-network-api.workspace = true
|
||||
reth-payload-validator.workspace = true
|
||||
|
||||
## async
|
||||
futures.workspace = true
|
||||
|
||||
@ -182,6 +182,12 @@ where
|
||||
let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
|
||||
.maybe_skip_fcu(node_config.debug.skip_fcu)
|
||||
.maybe_skip_new_payload(node_config.debug.skip_new_payload)
|
||||
.maybe_reorg(
|
||||
ctx.blockchain_db().clone(),
|
||||
ctx.components().evm_config().clone(),
|
||||
reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()),
|
||||
node_config.debug.reorg_frequency,
|
||||
)
|
||||
// Store messages _after_ skipping so that `replay-engine` command
|
||||
// would replay only the messages that were observed by the engine
|
||||
// during this run.
|
||||
|
||||
@ -50,6 +50,10 @@ pub struct DebugArgs {
|
||||
#[arg(long = "debug.skip-new-payload", help_heading = "Debug")]
|
||||
pub skip_new_payload: Option<usize>,
|
||||
|
||||
/// If provided, the chain will be reorged at specified frequency.
|
||||
#[arg(long = "debug.reorg-frequency", help_heading = "Debug")]
|
||||
pub reorg_frequency: Option<usize>,
|
||||
|
||||
/// The path to store engine API messages at.
|
||||
/// If specified, all of the intercepted engine API messages
|
||||
/// will be written to specified location.
|
||||
|
||||
Reference in New Issue
Block a user