feat: make EthService generic over engine types and block executor (#10212)

This commit is contained in:
Federico Gimenez
2024-08-08 19:22:55 +02:00
committed by GitHub
parent 75a501e9fa
commit c4aae6db36
7 changed files with 92 additions and 69 deletions

63
Cargo.lock generated
View File

@ -6982,6 +6982,37 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "reth-engine-service"
version = "1.0.4"
dependencies = [
"futures",
"pin-project",
"reth-beacon-consensus",
"reth-blockchain-tree",
"reth-chainspec",
"reth-consensus",
"reth-db-api",
"reth-engine-primitives",
"reth-engine-tree",
"reth-ethereum-engine-primitives",
"reth-evm",
"reth-evm-ethereum",
"reth-exex-types",
"reth-network-p2p",
"reth-payload-builder",
"reth-payload-validator",
"reth-primitives",
"reth-provider",
"reth-prune",
"reth-prune-types",
"reth-stages-api",
"reth-tasks",
"thiserror",
"tokio",
"tokio-stream",
]
[[package]] [[package]]
name = "reth-engine-tree" name = "reth-engine-tree"
version = "1.0.4" version = "1.0.4"
@ -7145,36 +7176,6 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "reth-ethereum-engine"
version = "1.0.4"
dependencies = [
"futures",
"pin-project",
"reth-beacon-consensus",
"reth-blockchain-tree",
"reth-chainspec",
"reth-consensus",
"reth-db-api",
"reth-engine-tree",
"reth-ethereum-engine-primitives",
"reth-evm",
"reth-evm-ethereum",
"reth-exex-types",
"reth-network-p2p",
"reth-payload-builder",
"reth-payload-validator",
"reth-primitives",
"reth-provider",
"reth-prune",
"reth-prune-types",
"reth-stages-api",
"reth-tasks",
"thiserror",
"tokio",
"tokio-stream",
]
[[package]] [[package]]
name = "reth-ethereum-engine-primitives" name = "reth-ethereum-engine-primitives"
version = "1.0.4" version = "1.0.4"
@ -7787,8 +7788,8 @@ dependencies = [
"reth-consensus", "reth-consensus",
"reth-db", "reth-db",
"reth-e2e-test-utils", "reth-e2e-test-utils",
"reth-engine-service",
"reth-engine-tree", "reth-engine-tree",
"reth-ethereum-engine",
"reth-ethereum-engine-primitives", "reth-ethereum-engine-primitives",
"reth-ethereum-payload-builder", "reth-ethereum-payload-builder",
"reth-evm-ethereum", "reth-evm-ethereum",

View File

@ -27,6 +27,7 @@ members = [
"crates/consensus/debug-client/", "crates/consensus/debug-client/",
"crates/e2e-test-utils/", "crates/e2e-test-utils/",
"crates/engine/primitives/", "crates/engine/primitives/",
"crates/engine/service",
"crates/engine/tree/", "crates/engine/tree/",
"crates/engine/util/", "crates/engine/util/",
"crates/errors/", "crates/errors/",
@ -35,7 +36,6 @@ members = [
"crates/ethereum/cli/", "crates/ethereum/cli/",
"crates/ethereum/consensus/", "crates/ethereum/consensus/",
"crates/ethereum/engine-primitives/", "crates/ethereum/engine-primitives/",
"crates/ethereum/engine/",
"crates/ethereum/evm", "crates/ethereum/evm",
"crates/ethereum/node", "crates/ethereum/node",
"crates/ethereum/payload/", "crates/ethereum/payload/",
@ -297,13 +297,13 @@ reth-e2e-test-utils = { path = "crates/e2e-test-utils" }
reth-ecies = { path = "crates/net/ecies" } reth-ecies = { path = "crates/net/ecies" }
reth-engine-primitives = { path = "crates/engine/primitives" } reth-engine-primitives = { path = "crates/engine/primitives" }
reth-engine-tree = { path = "crates/engine/tree" } reth-engine-tree = { path = "crates/engine/tree" }
reth-engine-service = { path = "crates/engine/service" }
reth-engine-util = { path = "crates/engine/util" } reth-engine-util = { path = "crates/engine/util" }
reth-errors = { path = "crates/errors" } reth-errors = { path = "crates/errors" }
reth-eth-wire = { path = "crates/net/eth-wire" } reth-eth-wire = { path = "crates/net/eth-wire" }
reth-eth-wire-types = { path = "crates/net/eth-wire-types" } reth-eth-wire-types = { path = "crates/net/eth-wire-types" }
reth-ethereum-cli = { path = "crates/ethereum/cli" } reth-ethereum-cli = { path = "crates/ethereum/cli" }
reth-ethereum-consensus = { path = "crates/ethereum/consensus" } reth-ethereum-consensus = { path = "crates/ethereum/consensus" }
reth-ethereum-engine = { path = "crates/ethereum/engine" }
reth-ethereum-engine-primitives = { path = "crates/ethereum/engine-primitives" } reth-ethereum-engine-primitives = { path = "crates/ethereum/engine-primitives" }
reth-ethereum-forks = { path = "crates/ethereum-forks" } reth-ethereum-forks = { path = "crates/ethereum-forks" }
reth-ethereum-payload-builder = { path = "crates/ethereum/payload" } reth-ethereum-payload-builder = { path = "crates/ethereum/payload" }

View File

@ -1,5 +1,5 @@
[package] [package]
name = "reth-ethereum-engine" name = "reth-engine-service"
version.workspace = true version.workspace = true
edition.workspace = true edition.workspace = true
rust-version.workspace = true rust-version.workspace = true
@ -14,17 +14,18 @@ workspace = true
# reth # reth
reth-beacon-consensus.workspace = true reth-beacon-consensus.workspace = true
reth-chainspec.workspace = true reth-chainspec.workspace = true
reth-consensus.workspace = true
reth-db-api.workspace = true reth-db-api.workspace = true
reth-engine-primitives.workspace = true
reth-engine-tree.workspace = true reth-engine-tree.workspace = true
reth-ethereum-engine-primitives.workspace = true reth-evm.workspace = true
reth-evm-ethereum.workspace = true
reth-network-p2p.workspace = true reth-network-p2p.workspace = true
reth-payload-builder.workspace = true
reth-payload-validator.workspace = true reth-payload-validator.workspace = true
reth-provider.workspace = true reth-provider.workspace = true
reth-prune.workspace = true reth-prune.workspace = true
reth-stages-api.workspace = true reth-stages-api.workspace = true
reth-tasks.workspace = true reth-tasks.workspace = true
reth-payload-builder.workspace = true
# async # async
futures.workspace = true futures.workspace = true
@ -38,9 +39,10 @@ thiserror.workspace = true
reth-blockchain-tree.workspace = true reth-blockchain-tree.workspace = true
reth-consensus.workspace = true reth-consensus.workspace = true
reth-engine-tree = { workspace = true, features = ["test-utils"] } reth-engine-tree = { workspace = true, features = ["test-utils"] }
reth-evm.workspace = true reth-ethereum-engine-primitives.workspace = true
reth-evm-ethereum.workspace = true
reth-exex-types.workspace = true reth-exex-types.workspace = true
reth-primitives.workspace = true reth-primitives.workspace = true
reth-prune-types.workspace = true reth-prune-types.workspace = true
tokio = { workspace = true, features = ["sync"] } tokio = { workspace = true, features = ["sync"] }

View File

@ -1,12 +1,12 @@
//! Ethereum engine implementation. //! Engine service implementation.
#![doc( #![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)] )]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
/// Ethereum engine service. /// Engine Service
pub mod service; pub mod service;

View File

@ -1,8 +1,10 @@
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use pin_project::pin_project; use pin_project::pin_project;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage, EthBeaconConsensus}; use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage};
use reth_chainspec::ChainSpec; use reth_chainspec::ChainSpec;
use reth_consensus::Consensus;
use reth_db_api::database::Database; use reth_db_api::database::Database;
use reth_engine_primitives::EngineTypes;
use reth_engine_tree::{ use reth_engine_tree::{
backfill::PipelineSync, backfill::PipelineSync,
download::BasicBlockDownloader, download::BasicBlockDownloader,
@ -14,8 +16,7 @@ pub use reth_engine_tree::{
chain::{ChainEvent, ChainOrchestrator}, chain::{ChainEvent, ChainOrchestrator},
engine::EngineApiEvent, engine::EngineApiEvent,
}; };
use reth_ethereum_engine_primitives::EthEngineTypes; use reth_evm::execute::BlockExecutorProvider;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_network_p2p::BlockClient; use reth_network_p2p::BlockClient;
use reth_payload_builder::PayloadBuilderHandle; use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_validator::ExecutionPayloadValidator; use reth_payload_validator::ExecutionPayloadValidator;
@ -24,58 +25,64 @@ use reth_prune::Pruner;
use reth_stages_api::Pipeline; use reth_stages_api::Pipeline;
use reth_tasks::TaskSpawner; use reth_tasks::TaskSpawner;
use std::{ use std::{
marker::PhantomData,
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
/// Alias for Ethereum chain orchestrator. /// Alias for chain orchestrator.
type EthServiceType<DB, Client> = ChainOrchestrator< type EngineServiceType<DB, Client, T> = ChainOrchestrator<
EngineHandler< EngineHandler<
EngineApiRequestHandler<EngineApiRequest<EthEngineTypes>>, EngineApiRequestHandler<EngineApiRequest<T>>,
UnboundedReceiverStream<BeaconEngineMessage<EthEngineTypes>>, UnboundedReceiverStream<BeaconEngineMessage<T>>,
BasicBlockDownloader<Client>, BasicBlockDownloader<Client>,
>, >,
PipelineSync<DB>, PipelineSync<DB>,
>; >;
/// The type that drives the Ethereum chain forward and communicates progress. /// The type that drives the chain forward and communicates progress.
#[pin_project] #[pin_project]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct EthService<DB, Client> pub struct EngineService<DB, Client, E, T>
where where
DB: Database + 'static, DB: Database + 'static,
Client: BlockClient + 'static, Client: BlockClient + 'static,
E: BlockExecutorProvider + 'static,
T: EngineTypes,
{ {
orchestrator: EthServiceType<DB, Client>, orchestrator: EngineServiceType<DB, Client, T>,
_marker: PhantomData<E>,
} }
impl<DB, Client> EthService<DB, Client> impl<DB, Client, E, T> EngineService<DB, Client, E, T>
where where
DB: Database + 'static, DB: Database + 'static,
Client: BlockClient + 'static, Client: BlockClient + 'static,
E: BlockExecutorProvider + 'static,
T: EngineTypes + 'static,
{ {
/// Constructor for `EthService`. /// Constructor for `EngineService`.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
consensus: Arc<dyn Consensus>,
executor_factory: E,
chain_spec: Arc<ChainSpec>, chain_spec: Arc<ChainSpec>,
client: Client, client: Client,
incoming_requests: UnboundedReceiverStream<BeaconEngineMessage<EthEngineTypes>>, incoming_requests: UnboundedReceiverStream<BeaconEngineMessage<T>>,
pipeline: Pipeline<DB>, pipeline: Pipeline<DB>,
pipeline_task_spawner: Box<dyn TaskSpawner>, pipeline_task_spawner: Box<dyn TaskSpawner>,
provider: ProviderFactory<DB>, provider: ProviderFactory<DB>,
blockchain_db: BlockchainProvider2<DB>, blockchain_db: BlockchainProvider2<DB>,
pruner: Pruner<DB, ProviderFactory<DB>>, pruner: Pruner<DB, ProviderFactory<DB>>,
payload_builder: PayloadBuilderHandle<EthEngineTypes>, payload_builder: PayloadBuilderHandle<T>,
tree_config: TreeConfig, tree_config: TreeConfig,
) -> Self { ) -> Self {
let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
let downloader = BasicBlockDownloader::new(client, consensus.clone()); let downloader = BasicBlockDownloader::new(client, consensus.clone());
let persistence_handle = PersistenceHandle::spawn_service(provider, pruner); let persistence_handle = PersistenceHandle::spawn_service(provider, pruner);
let payload_validator = ExecutionPayloadValidator::new(chain_spec.clone()); let payload_validator = ExecutionPayloadValidator::new(chain_spec);
let executor_factory = EthExecutorProvider::ethereum(chain_spec);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
@ -95,19 +102,24 @@ where
let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner); let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
Self { orchestrator: ChainOrchestrator::new(handler, backfill_sync) } Self {
orchestrator: ChainOrchestrator::new(handler, backfill_sync),
_marker: Default::default(),
}
} }
/// Returns a mutable reference to the orchestrator. /// Returns a mutable reference to the orchestrator.
pub fn orchestrator_mut(&mut self) -> &mut EthServiceType<DB, Client> { pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<DB, Client, T> {
&mut self.orchestrator &mut self.orchestrator
} }
} }
impl<DB, Client> Stream for EthService<DB, Client> impl<DB, Client, E, T> Stream for EngineService<DB, Client, E, T>
where where
DB: Database + 'static, DB: Database + 'static,
Client: BlockClient + 'static, Client: BlockClient + 'static,
E: BlockExecutorProvider + 'static,
T: EngineTypes + 'static,
{ {
type Item = ChainEvent<BeaconConsensusEngineEvent>; type Item = ChainEvent<BeaconConsensusEngineEvent>;
@ -117,17 +129,19 @@ where
} }
} }
/// Potential error returned by `EthService`. /// Potential error returned by `EngineService`.
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[error("Eth service error.")] #[error("Engine service error.")]
pub struct EthServiceError {} pub struct EngineServiceError {}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_chainspec::{ChainSpecBuilder, MAINNET}; use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_engine_tree::test_utils::TestPipelineBuilder; use reth_engine_tree::test_utils::TestPipelineBuilder;
use reth_ethereum_engine_primitives::EthEngineTypes; use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_exex_types::FinishedExExHeight; use reth_exex_types::FinishedExExHeight;
use reth_network_p2p::test_utils::TestFullBlockClient; use reth_network_p2p::test_utils::TestFullBlockClient;
use reth_primitives::SealedHeader; use reth_primitives::SealedHeader;
@ -145,6 +159,7 @@ mod tests {
.paris_activated() .paris_activated()
.build(), .build(),
); );
let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
let client = TestFullBlockClient::default(); let client = TestFullBlockClient::default();
@ -155,6 +170,7 @@ mod tests {
let pipeline_task_spawner = Box::<TokioTaskExecutor>::default(); let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
let executor_factory = EthExecutorProvider::ethereum(chain_spec.clone());
let blockchain_db = let blockchain_db =
BlockchainProvider2::with_latest(provider_factory.clone(), SealedHeader::default()) BlockchainProvider2::with_latest(provider_factory.clone(), SealedHeader::default())
.unwrap(); .unwrap();
@ -164,7 +180,9 @@ mod tests {
Pruner::<_, ProviderFactory<_>>::new(provider_factory.clone(), vec![], 0, 0, None, rx); Pruner::<_, ProviderFactory<_>>::new(provider_factory.clone(), vec![], 0, 0, None, rx);
let (tx, _rx) = unbounded_channel(); let (tx, _rx) = unbounded_channel();
let _eth_service = EthService::new( let _eth_service = EngineService::new(
consensus,
executor_factory,
chain_spec, chain_spec,
client, client,
incoming_requests, incoming_requests,

View File

@ -14,7 +14,7 @@ workspace = true
# reth # reth
reth-payload-builder.workspace = true reth-payload-builder.workspace = true
reth-ethereum-engine-primitives.workspace = true reth-ethereum-engine-primitives.workspace = true
reth-ethereum-engine.workspace = true reth-engine-service.workspace = true
reth-basic-payload-builder.workspace = true reth-basic-payload-builder.workspace = true
reth-ethereum-payload-builder.workspace = true reth-ethereum-payload-builder.workspace = true
reth-node-builder.workspace = true reth-node-builder.workspace = true

View File

@ -6,8 +6,8 @@ use reth_beacon_consensus::{
BeaconConsensusEngineHandle, BeaconConsensusEngineHandle,
}; };
use reth_blockchain_tree::BlockchainTreeConfig; use reth_blockchain_tree::BlockchainTreeConfig;
use reth_engine_service::service::{ChainEvent, EngineService};
use reth_engine_tree::tree::TreeConfig; use reth_engine_tree::tree::TreeConfig;
use reth_ethereum_engine::service::{ChainEvent, EthService};
use reth_ethereum_engine_primitives::EthEngineTypes; use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_exex::ExExManagerHandle; use reth_exex::ExExManagerHandle;
use reth_network::{ use reth_network::{
@ -173,7 +173,9 @@ where
info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized"); info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
// Configure the consensus engine // Configure the consensus engine
let mut eth_service = EthService::new( let mut eth_service = EngineService::new(
ctx.consensus(),
ctx.components().block_executor().clone(),
ctx.chain_spec(), ctx.chain_spec(),
network_client.clone(), network_client.clone(),
UnboundedReceiverStream::new(consensus_engine_rx), UnboundedReceiverStream::new(consensus_engine_rx),