feat: add Consensus to ExecutionStage (#14447)

This commit is contained in:
Arsenii Kulikov
2025-02-12 18:48:14 +04:00
committed by GitHub
parent 3e07d65751
commit 172369afd5
31 changed files with 355 additions and 194 deletions

View File

@ -11,7 +11,6 @@ repository.workspace = true
[dependencies]
# reth
reth-ethereum-consensus.workspace = true
reth-chainspec.workspace = true
reth-cli.workspace = true
reth-ethereum-cli.workspace = true

View File

@ -5,11 +5,11 @@ use clap::Parser;
use reth_chainspec::EthChainSpec;
use reth_cli::chainspec::ChainSpecParser;
use reth_config::{config::EtlConfig, Config};
use reth_consensus::noop::NoopConsensus;
use reth_consensus::{noop::NoopConsensus, ConsensusError, FullConsensus};
use reth_db::{init_db, open_db_read_only, DatabaseEnv};
use reth_db_common::init::init_genesis;
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
use reth_evm::noop::NoopBlockExecutorProvider;
use reth_evm::{execute::BlockExecutorProvider, noop::NoopBlockExecutorProvider};
use reth_node_builder::{NodeTypesWithDBAdapter, NodeTypesWithEngine};
use reth_node_core::{
args::{DatabaseArgs, DatadirArgs},
@ -199,3 +199,33 @@ impl AccessRights {
/// [`NodeTypes`](reth_node_builder::NodeTypes) in CLI.
pub trait CliNodeTypes: NodeTypesWithEngine + NodeTypesForProvider {}
impl<N> CliNodeTypes for N where N: NodeTypesWithEngine + NodeTypesForProvider {}
/// Helper trait aggregating components required for the CLI.
pub trait CliNodeComponents<N: CliNodeTypes> {
/// Block executor.
type Executor: BlockExecutorProvider<Primitives = N::Primitives>;
/// Consensus implementation.
type Consensus: FullConsensus<N::Primitives, Error = ConsensusError> + Clone + 'static;
/// Returns the block executor.
fn executor(&self) -> &Self::Executor;
/// Returns the consensus implementation.
fn consensus(&self) -> &Self::Consensus;
}
impl<N: CliNodeTypes, E, C> CliNodeComponents<N> for (E, C)
where
E: BlockExecutorProvider<Primitives = N::Primitives>,
C: FullConsensus<N::Primitives, Error = ConsensusError> + Clone + 'static,
{
type Executor = E;
type Consensus = C;
fn executor(&self) -> &Self::Executor {
&self.0
}
fn consensus(&self) -> &Self::Consensus {
&self.1
}
}

View File

@ -1,12 +1,12 @@
//! Command that initializes the node by importing a chain from a file.
use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
use alloy_primitives::B256;
use clap::Parser;
use futures::{Stream, StreamExt};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_cli::chainspec::ChainSpecParser;
use reth_config::Config;
use reth_consensus::{Consensus, ConsensusError};
use reth_consensus::{ConsensusError, FullConsensus};
use reth_db::tables;
use reth_db_api::transaction::DbTx;
use reth_downloaders::{
@ -14,7 +14,6 @@ use reth_downloaders::{
file_client::{ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE},
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_ethereum_consensus::EthBeaconConsensus;
use reth_evm::execute::BlockExecutorProvider;
use reth_network_p2p::{
bodies::downloader::BodyDownloader,
@ -58,11 +57,11 @@ pub struct ImportCommand<C: ChainSpecParser> {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportCommand<C> {
/// Execute `import` command
pub async fn execute<N, E, F>(self, executor: F) -> eyre::Result<()>
pub async fn execute<N, Comp, F>(self, components: F) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
E: BlockExecutorProvider<Primitives = N::Primitives>,
F: FnOnce(Arc<N::ChainSpec>) -> E,
Comp: CliNodeComponents<N>,
F: FnOnce(Arc<N::ChainSpec>) -> Comp,
{
info!(target: "reth::cli", "reth {} starting", SHORT_VERSION);
@ -77,8 +76,9 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
let executor = executor(provider_factory.chain_spec());
let consensus = Arc::new(EthBeaconConsensus::new(self.env.chain.clone()));
let components = components(provider_factory.chain_spec());
let executor = components.executor().clone();
let consensus = Arc::new(components.consensus().clone());
info!(target: "reth::cli", "Consensus engine initialized");
// open file
@ -179,7 +179,7 @@ pub fn build_import_pipeline<N, C, E>(
) -> eyre::Result<(Pipeline<N>, impl Stream<Item = NodeEvent<N::Primitives>>)>
where
N: ProviderNodeTypes + CliNodeTypes,
C: Consensus<BlockTy<N>, Error = ConsensusError> + 'static,
C: FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
E: BlockExecutorProvider<Primitives = N::Primitives>,
{
if !file_client.has_canonical_blocks() {

View File

@ -1,6 +1,7 @@
use std::sync::Arc;
use super::setup;
use reth_consensus::{noop::NoopConsensus, ConsensusError, FullConsensus};
use reth_db::{tables, DatabaseEnv};
use reth_db_api::{
cursor::DbCursorRO, database::Database, table::TableImporter, transaction::DbTx,
@ -16,17 +17,19 @@ use reth_provider::{
use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput};
use tracing::info;
pub(crate) async fn dump_execution_stage<N, E>(
pub(crate) async fn dump_execution_stage<N, E, C>(
db_tool: &DbTool<N>,
from: u64,
to: u64,
output_datadir: ChainPath<DataDirPath>,
should_run: bool,
executor: E,
consensus: C,
) -> eyre::Result<()>
where
N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
E: BlockExecutorProvider<Primitives = N::Primitives>,
C: FullConsensus<E::Primitives, Error = ConsensusError> + 'static,
{
let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
@ -44,6 +47,7 @@ where
to,
from,
executor,
consensus,
)?;
}
@ -139,8 +143,10 @@ fn unwind_and_copy<N: ProviderNodeTypes>(
) -> eyre::Result<()> {
let provider = db_tool.provider_factory.database_provider_rw()?;
let mut exec_stage =
ExecutionStage::new_with_executor(NoopBlockExecutorProvider::<N::Primitives>::default());
let mut exec_stage = ExecutionStage::new_with_executor(
NoopBlockExecutorProvider::<N::Primitives>::default(),
NoopConsensus::arc(),
);
exec_stage.unwind(
&provider,
@ -162,19 +168,21 @@ fn unwind_and_copy<N: ProviderNodeTypes>(
}
/// Try to re-execute the stage without committing
fn dry_run<N, E>(
fn dry_run<N, E, C>(
output_provider_factory: ProviderFactory<N>,
to: u64,
from: u64,
executor: E,
consensus: C,
) -> eyre::Result<()>
where
N: ProviderNodeTypes,
E: BlockExecutorProvider<Primitives = N::Primitives>,
C: FullConsensus<E::Primitives, Error = ConsensusError> + 'static,
{
info!(target: "reth::cli", "Executing stage. [dry-run]");
let mut exec_stage = ExecutionStage::new_with_executor(executor);
let mut exec_stage = ExecutionStage::new_with_executor(executor, Arc::new(consensus));
let input =
reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };

View File

@ -4,6 +4,7 @@ use super::setup;
use alloy_primitives::BlockNumber;
use eyre::Result;
use reth_config::config::EtlConfig;
use reth_consensus::noop::NoopConsensus;
use reth_db::{tables, DatabaseEnv};
use reth_db_api::{database::Database, table::TableImporter};
use reth_db_common::DbTool;
@ -96,6 +97,7 @@ fn unwind_and_copy<N: ProviderNodeTypes>(
// Bring Plainstate to TO (hashing stage execution requires it)
let mut exec_stage = ExecutionStage::new(
NoopBlockExecutorProvider::<N::Primitives>::default(), // Not necessary for unwinding.
NoopConsensus::arc(),
ExecutionStageThresholds {
max_blocks: Some(u64::MAX),
max_changes: None,

View File

@ -1,5 +1,5 @@
//! Database debugging tool
use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
use clap::Parser;
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_cli::chainspec::ChainSpecParser;
@ -9,7 +9,6 @@ use reth_db_api::{
transaction::DbTx,
};
use reth_db_common::DbTool;
use reth_evm::execute::BlockExecutorProvider;
use reth_node_builder::NodeTypesWithDB;
use reth_node_core::{
args::DatadirArgs,
@ -80,29 +79,31 @@ macro_rules! handle_stage {
$stage_fn($tool, *from, *to, output_datadir, *dry_run).await?
}};
($stage_fn:ident, $tool:expr, $command:expr, $executor:expr) => {{
($stage_fn:ident, $tool:expr, $command:expr, $executor:expr, $consensus:expr) => {{
let StageCommand { output_datadir, from, to, dry_run, .. } = $command;
let output_datadir =
output_datadir.with_chain($tool.chain().chain(), DatadirArgs::default());
$stage_fn($tool, *from, *to, output_datadir, *dry_run, $executor).await?
$stage_fn($tool, *from, *to, output_datadir, *dry_run, $executor, $consensus).await?
}};
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
/// Execute `dump-stage` command
pub async fn execute<N, E, F>(self, executor: F) -> eyre::Result<()>
pub async fn execute<N, Comp, F>(self, components: F) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
E: BlockExecutorProvider<Primitives = N::Primitives>,
F: FnOnce(Arc<C::ChainSpec>) -> E,
Comp: CliNodeComponents<N>,
F: FnOnce(Arc<C::ChainSpec>) -> Comp,
{
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
let tool = DbTool::new(provider_factory)?;
match &self.command {
Stages::Execution(cmd) => {
let executor = executor(tool.chain());
handle_stage!(dump_execution_stage, &tool, cmd, executor)
let components = components(tool.chain());
let executor = components.executor().clone();
let consensus = components.consensus().clone();
handle_stage!(dump_execution_stage, &tool, cmd, executor, consensus)
}
Stages::StorageHashing(cmd) => handle_stage!(dump_hashing_storage_stage, &tool, cmd),
Stages::AccountHashing(cmd) => handle_stage!(dump_hashing_account_stage, &tool, cmd),

View File

@ -2,13 +2,12 @@
use std::sync::Arc;
use crate::common::CliNodeTypes;
use crate::common::{CliNodeComponents, CliNodeTypes};
use clap::{Parser, Subcommand};
use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_runner::CliContext;
use reth_eth_wire::NetPrimitivesFor;
use reth_evm::execute::BlockExecutorProvider;
pub mod drop;
pub mod dump;
@ -42,17 +41,17 @@ pub enum Subcommands<C: ChainSpecParser> {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
/// Execute `stage` command
pub async fn execute<N, E, F, P>(self, ctx: CliContext, executor: F) -> eyre::Result<()>
pub async fn execute<N, Comp, F, P>(self, ctx: CliContext, components: F) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
E: BlockExecutorProvider<Primitives = N::Primitives>,
F: FnOnce(Arc<C::ChainSpec>) -> E,
Comp: CliNodeComponents<N>,
F: FnOnce(Arc<C::ChainSpec>) -> Comp,
P: NetPrimitivesFor<N::Primitives>,
{
match self.command {
Subcommands::Run(command) => command.execute::<N, _, _, P>(ctx, executor).await,
Subcommands::Run(command) => command.execute::<N, _, _, P>(ctx, components).await,
Subcommands::Drop(command) => command.execute::<N>().await,
Subcommands::Dump(command) => command.execute::<N, _, _>(executor).await,
Subcommands::Dump(command) => command.execute::<N, _, _>(components).await,
Subcommands::Unwind(command) => command.execute::<N>().await,
}
}

View File

@ -2,7 +2,7 @@
//!
//! Stage debugging tool
use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::Sealable;
use clap::Parser;
@ -17,8 +17,6 @@ use reth_downloaders::{
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_eth_wire::NetPrimitivesFor;
use reth_ethereum_consensus::EthBeaconConsensus;
use reth_evm::execute::BlockExecutorProvider;
use reth_exex::ExExManagerHandle;
use reth_network::BlockDownloaderProvider;
use reth_network_p2p::HeadersClient;
@ -105,11 +103,11 @@ pub struct Command<C: ChainSpecParser> {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
/// Execute `stage` command
pub async fn execute<N, E, F, P>(self, ctx: CliContext, executor: F) -> eyre::Result<()>
pub async fn execute<N, Comp, F, P>(self, ctx: CliContext, components: F) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
E: BlockExecutorProvider<Primitives = N::Primitives>,
F: FnOnce(Arc<C::ChainSpec>) -> E,
Comp: CliNodeComponents<N>,
F: FnOnce(Arc<C::ChainSpec>) -> Comp,
P: NetPrimitivesFor<N::Primitives>,
{
// Raise the fd limit of the process.
@ -120,6 +118,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
self.env.init::<N>(AccessRights::RW)?;
let mut provider_rw = provider_factory.database_provider_rw()?;
let components = components(provider_factory.chain_spec());
if let Some(listen_addr) = self.metrics {
info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
@ -162,8 +161,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
match self.stage {
StageEnum::Headers => {
let consensus =
Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
let consensus = Arc::new(components.consensus().clone());
let network_secret_path = self
.network
@ -215,8 +213,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
)
}
StageEnum::Bodies => {
let consensus =
Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
let consensus = Arc::new(components.consensus().clone());
let mut config = config;
config.peers.trusted_nodes_only = self.network.trusted_only;
@ -267,7 +264,8 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
),
StageEnum::Execution => (
Box::new(ExecutionStage::new(
executor(provider_factory.chain_spec()),
components.executor().clone(),
Arc::new(components.consensus().clone()),
ExecutionStageThresholds {
max_blocks: Some(batch_size),
max_changes: None,

View File

@ -115,9 +115,14 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
let builder = if self.offline {
Pipeline::<N>::builder().add_stages(
OfflineStages::new(executor, config.stages, prune_modes.clone())
.builder()
.disable(reth_stages::StageId::SenderRecovery),
OfflineStages::new(
executor,
NoopConsensus::arc(),
config.stages,
prune_modes.clone(),
)
.builder()
.disable(reth_stages::StageId::SenderRecovery),
)
} else {
Pipeline::<N>::builder().with_tip_sender(tip_tx).add_stages(
@ -133,6 +138,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
)
.set(ExecutionStage::new(
executor,
Arc::new(NoopConsensus::default()),
ExecutionStageThresholds {
max_blocks: None,
max_changes: None,