feat: integrate BlockchainProvider2 in EthNodeLauncher (#9754)

This commit is contained in:
Federico Gimenez
2024-07-24 13:13:40 +02:00
committed by GitHub
parent fa1bbe44d6
commit 5616b4efad
7 changed files with 99 additions and 58 deletions

1
Cargo.lock generated
View File

@ -7767,6 +7767,7 @@ dependencies = [
"reth-auto-seal-consensus", "reth-auto-seal-consensus",
"reth-basic-payload-builder", "reth-basic-payload-builder",
"reth-beacon-consensus", "reth-beacon-consensus",
"reth-blockchain-tree",
"reth-chainspec", "reth-chainspec",
"reth-consensus", "reth-consensus",
"reth-db", "reth-db",

View File

@ -19,7 +19,7 @@ use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_network_p2p::{bodies::client::BodiesClient, headers::client::HeadersClient}; use reth_network_p2p::{bodies::client::BodiesClient, headers::client::HeadersClient};
use reth_payload_builder::PayloadBuilderHandle; use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_validator::ExecutionPayloadValidator; use reth_payload_validator::ExecutionPayloadValidator;
use reth_provider::{providers::BlockchainProvider, ProviderFactory}; use reth_provider::{providers::BlockchainProvider2, ProviderFactory};
use reth_prune::Pruner; use reth_prune::Pruner;
use reth_stages_api::Pipeline; use reth_stages_api::Pipeline;
use reth_tasks::TaskSpawner; use reth_tasks::TaskSpawner;
@ -65,7 +65,7 @@ where
pipeline: Pipeline<DB>, pipeline: Pipeline<DB>,
pipeline_task_spawner: Box<dyn TaskSpawner>, pipeline_task_spawner: Box<dyn TaskSpawner>,
provider: ProviderFactory<DB>, provider: ProviderFactory<DB>,
blockchain_db: BlockchainProvider<DB>, blockchain_db: BlockchainProvider2<DB>,
pruner: Pruner<DB, ProviderFactory<DB>>, pruner: Pruner<DB, ProviderFactory<DB>>,
payload_builder: PayloadBuilderHandle<EthEngineTypes>, payload_builder: PayloadBuilderHandle<EthEngineTypes>,
) -> Self { ) -> Self {
@ -123,18 +123,13 @@ pub struct EthServiceError {}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use reth_blockchain_tree::{
BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals,
};
use reth_chainspec::{ChainSpecBuilder, MAINNET}; use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_consensus::test_utils::TestConsensus;
use reth_engine_tree::test_utils::TestPipelineBuilder; use reth_engine_tree::test_utils::TestPipelineBuilder;
use reth_ethereum_engine_primitives::EthEngineTypes; use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_exex_types::FinishedExExHeight; use reth_exex_types::FinishedExExHeight;
use reth_network_p2p::test_utils::TestFullBlockClient; use reth_network_p2p::test_utils::TestFullBlockClient;
use reth_primitives::SealedHeader; use reth_primitives::SealedHeader;
use reth_provider::test_utils::create_test_provider_factory_with_chain_spec; use reth_provider::test_utils::create_test_provider_factory_with_chain_spec;
use reth_prune::PruneModes;
use reth_tasks::TokioTaskExecutor; use reth_tasks::TokioTaskExecutor;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc::unbounded_channel, watch}; use tokio::sync::{mpsc::unbounded_channel, watch};
@ -157,23 +152,9 @@ mod tests {
let pipeline = TestPipelineBuilder::new().build(chain_spec.clone()); let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
let pipeline_task_spawner = Box::<TokioTaskExecutor>::default(); let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
let consensus = Arc::new(TestConsensus::default());
let executor_factory = EthExecutorProvider::ethereum(chain_spec.clone());
let externals = TreeExternals::new(provider_factory.clone(), consensus, executor_factory);
let tree = Arc::new(ShareableBlockchainTree::new(
BlockchainTree::new(
externals,
BlockchainTreeConfig::new(1, 2, 3, 2),
PruneModes::default(),
)
.expect("failed to create tree"),
));
let blockchain_db = BlockchainProvider::with_latest( let blockchain_db =
provider_factory.clone(), BlockchainProvider2::with_latest(provider_factory.clone(), SealedHeader::default());
tree,
SealedHeader::default(),
);
let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs); let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
let pruner = let pruner =

View File

@ -35,6 +35,7 @@ reth-tokio-util.workspace = true
reth-node-events.workspace = true reth-node-events.workspace = true
reth-node-core.workspace = true reth-node-core.workspace = true
reth-exex.workspace = true reth-exex.workspace = true
reth-blockchain-tree.workspace = true
# misc # misc
eyre.workspace = true eyre.workspace = true

View File

@ -5,6 +5,7 @@ use reth_beacon_consensus::{
hooks::{EngineHooks, StaticFileHook}, hooks::{EngineHooks, StaticFileHook},
BeaconConsensusEngineHandle, BeaconConsensusEngineHandle,
}; };
use reth_blockchain_tree::BlockchainTreeConfig;
use reth_ethereum_engine::service::{ChainEvent, EthService}; use reth_ethereum_engine::service::{ChainEvent, EthService};
use reth_ethereum_engine_primitives::EthEngineTypes; use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_exex::ExExManagerHandle; use reth_exex::ExExManagerHandle;
@ -23,7 +24,7 @@ use reth_node_core::{
version::{CARGO_PKG_VERSION, CLIENT_CODE, NAME_CLIENT, VERGEN_GIT_SHA}, version::{CARGO_PKG_VERSION, CLIENT_CODE, NAME_CLIENT, VERGEN_GIT_SHA},
}; };
use reth_node_events::{cl::ConsensusLayerHealthEvents, node}; use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
use reth_provider::providers::BlockchainProvider; use reth_provider::providers::BlockchainProvider2;
use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi}; use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
use reth_rpc_types::engine::ClientVersionV1; use reth_rpc_types::engine::ClientVersionV1;
use reth_tasks::TaskExecutor; use reth_tasks::TaskExecutor;
@ -49,7 +50,7 @@ impl EthNodeLauncher {
impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EthNodeLauncher impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EthNodeLauncher
where where
T: FullNodeTypes< T: FullNodeTypes<
Provider = BlockchainProvider<<T as FullNodeTypes>::DB>, Provider = BlockchainProvider2<<T as FullNodeTypes>::DB>,
Engine = EthEngineTypes, Engine = EthEngineTypes,
>, >,
CB: NodeComponentsBuilder<T>, CB: NodeComponentsBuilder<T>,
@ -72,6 +73,15 @@ where
} = target; } = target;
let NodeHooks { on_component_initialized, on_node_started, .. } = hooks; let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
// TODO: move tree_config and canon_state_notification_sender
// initialization to with_blockchain_db once the engine revamp is done
// https://github.com/paradigmxyz/reth/issues/8742
let tree_config = BlockchainTreeConfig::default();
// NOTE: This is a temporary workaround to provide the canon state notification sender to the components builder because there's a cyclic dependency between the blockchain provider and the tree component. This will be removed once the Blockchain provider no longer depends on an instance of the tree: <https://github.com/paradigmxyz/reth/issues/7154>
let (canon_state_notification_sender, _receiver) =
tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2);
// setup the launch context // setup the launch context
let ctx = ctx let ctx = ctx
.with_configured_globals() .with_configured_globals()
@ -99,7 +109,9 @@ where
.with_metrics_task() .with_metrics_task()
// passing FullNodeTypes as type parameter here so that we can build // passing FullNodeTypes as type parameter here so that we can build
// later the components. // later the components.
.with_blockchain_db::<T>()? .with_blockchain_db::<T, _>(move |provider_factory| {
Ok(BlockchainProvider2::new(provider_factory)?)
}, tree_config, canon_state_notification_sender)?
.with_components(components_builder, on_component_initialized).await?; .with_components(components_builder, on_component_initialized).await?;
// spawn exexs // spawn exexs

View File

@ -1,11 +1,17 @@
//! Node builder setup tests. //! Node builder setup tests.
use reth_db::test_utils::create_test_rw_db; use std::sync::Arc;
use reth_db::{
test_utils::{create_test_rw_db, TempDatabase},
DatabaseEnv,
};
use reth_node_builder::{FullNodeComponents, NodeBuilder, NodeConfig}; use reth_node_builder::{FullNodeComponents, NodeBuilder, NodeConfig};
use reth_node_ethereum::{ use reth_node_ethereum::{
launch::EthNodeLauncher, launch::EthNodeLauncher,
node::{EthereumAddOns, EthereumNode}, node::{EthereumAddOns, EthereumNode},
}; };
use reth_provider::providers::BlockchainProvider2;
use reth_tasks::TaskManager; use reth_tasks::TaskManager;
#[test] #[test]
@ -45,7 +51,7 @@ async fn test_eth_launcher() {
let db = create_test_rw_db(); let db = create_test_rw_db();
let _builder = NodeBuilder::new(config) let _builder = NodeBuilder::new(config)
.with_database(db) .with_database(db)
.with_types::<EthereumNode>() .with_types_and_provider::<EthereumNode, BlockchainProvider2<Arc<TempDatabase<DatabaseEnv>>>>()
.with_components(EthereumNode::components()) .with_components(EthereumNode::components())
.with_add_ons::<EthereumAddOns>() .with_add_ons::<EthereumAddOns>()
.launch_with_fn(|builder| { .launch_with_fn(|builder| {

View File

@ -11,8 +11,7 @@ use rayon::ThreadPoolBuilder;
use reth_auto_seal_consensus::MiningMode; use reth_auto_seal_consensus::MiningMode;
use reth_beacon_consensus::EthBeaconConsensus; use reth_beacon_consensus::EthBeaconConsensus;
use reth_blockchain_tree::{ use reth_blockchain_tree::{
noop::NoopBlockchainTree, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals,
TreeExternals,
}; };
use reth_chainspec::{Chain, ChainSpec}; use reth_chainspec::{Chain, ChainSpec};
use reth_config::{config::EtlConfig, PruneConfig}; use reth_config::{config::EtlConfig, PruneConfig};
@ -29,8 +28,9 @@ use reth_node_core::{
}; };
use reth_primitives::{BlockNumber, Head, B256}; use reth_primitives::{BlockNumber, Head, B256};
use reth_provider::{ use reth_provider::{
providers::{BlockchainProvider, StaticFileProvider}, providers::{BlockchainProvider, BlockchainProvider2, StaticFileProvider},
CanonStateNotificationSender, ProviderFactory, StaticFileProviderFactory, CanonStateNotificationSender, FullProvider, ProviderFactory, StaticFileProviderFactory,
TreeViewer,
}; };
use reth_prune::{PruneModes, PrunerBuilder}; use reth_prune::{PruneModes, PrunerBuilder};
use reth_rpc_builder::config::RethRpcServerConfig; use reth_rpc_builder::config::RethRpcServerConfig;
@ -45,6 +45,27 @@ use tokio::sync::{
oneshot, watch, oneshot, watch,
}; };
/// Allows to set a tree viewer for a configured blockchain provider.
// TODO: remove this helper trait once the engine revamp is done, the new
// blockchain provider won't require a TreeViewer.
// https://github.com/paradigmxyz/reth/issues/8742
pub trait WithTree {
/// Setter for tree viewer.
fn set_tree(self, tree: Arc<dyn TreeViewer>) -> Self;
}
impl<DB: Database> WithTree for BlockchainProvider<DB> {
fn set_tree(self, tree: Arc<dyn TreeViewer>) -> Self {
self.with_tree(tree)
}
}
impl<DB: Database> WithTree for BlockchainProvider2<DB> {
fn set_tree(self, _tree: Arc<dyn TreeViewer>) -> Self {
self
}
}
/// Reusable setup for launching a node. /// Reusable setup for launching a node.
/// ///
/// This provides commonly used boilerplate for launching a node. /// This provides commonly used boilerplate for launching a node.
@ -523,24 +544,18 @@ where
} }
/// Creates a `BlockchainProvider` and attaches it to the launch context. /// Creates a `BlockchainProvider` and attaches it to the launch context.
pub fn with_blockchain_db<T>( pub fn with_blockchain_db<T, F>(
self, self,
create_blockchain_provider: F,
tree_config: BlockchainTreeConfig,
canon_state_notification_sender: CanonStateNotificationSender,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB, T>>>> ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB, T>>>>
where where
T: FullNodeTypes<Provider = BlockchainProvider<<T as FullNodeTypes>::DB>>, T: FullNodeTypes,
T::Provider: FullProvider<DB>,
F: FnOnce(ProviderFactory<DB>) -> eyre::Result<T::Provider>,
{ {
let tree_config = BlockchainTreeConfig::default(); let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
// NOTE: This is a temporary workaround to provide the canon state notification sender to the components builder because there's a cyclic dependency between the blockchain provider and the tree component. This will be removed once the Blockchain provider no longer depends on an instance of the tree: <https://github.com/paradigmxyz/reth/issues/7154>
let (canon_state_notification_sender, _receiver) =
tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2);
let blockchain_db = BlockchainProvider::new(
self.provider_factory().clone(),
Arc::new(NoopBlockchainTree::with_canon_state_notifications(
canon_state_notification_sender.clone(),
)),
)?;
let metered_providers = WithMeteredProviders { let metered_providers = WithMeteredProviders {
db_provider_container: WithMeteredProvider { db_provider_container: WithMeteredProvider {
@ -566,7 +581,8 @@ where
impl<DB, T> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB, T>>> impl<DB, T> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB, T>>>
where where
DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static, DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static,
T: FullNodeTypes<Provider = BlockchainProvider<DB>>, T: FullNodeTypes,
T::Provider: FullProvider<DB> + WithTree,
{ {
/// Returns access to the underlying database. /// Returns access to the underlying database.
pub fn database(&self) -> &DB { pub fn database(&self) -> &DB {
@ -592,8 +608,8 @@ where
self.right().db_provider_container.metrics_sender.clone() self.right().db_provider_container.metrics_sender.clone()
} }
/// Returns a reference to the `BlockchainProvider`. /// Returns a reference to the blockchain provider.
pub const fn blockchain_db(&self) -> &BlockchainProvider<DB> { pub const fn blockchain_db(&self) -> &T::Provider {
&self.right().blockchain_db &self.right().blockchain_db
} }
@ -648,7 +664,7 @@ where
let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree)); let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree));
// Replace the tree component with the actual tree // Replace the tree component with the actual tree
let blockchain_db = self.blockchain_db().clone().with_tree(blockchain_tree); let blockchain_db = self.blockchain_db().clone().set_tree(blockchain_tree);
debug!(target: "reth::cli", "configured blockchain tree"); debug!(target: "reth::cli", "configured blockchain tree");
@ -685,7 +701,8 @@ where
impl<DB, T, CB> LaunchContextWith<Attached<WithConfigs, WithComponents<DB, T, CB>>> impl<DB, T, CB> LaunchContextWith<Attached<WithConfigs, WithComponents<DB, T, CB>>>
where where
DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static, DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static,
T: FullNodeTypes<Provider = BlockchainProvider<DB>>, T: FullNodeTypes,
T::Provider: FullProvider<DB> + WithTree,
CB: NodeComponentsBuilder<T>, CB: NodeComponentsBuilder<T>,
{ {
/// Returns the configured `ProviderFactory`. /// Returns the configured `ProviderFactory`.
@ -722,8 +739,8 @@ where
&self.right().node_adapter &self.right().node_adapter
} }
/// Returns a reference to the `BlockchainProvider`. /// Returns a reference to the blockchain provider.
pub const fn blockchain_db(&self) -> &BlockchainProvider<DB> { pub const fn blockchain_db(&self) -> &T::Provider {
&self.right().blockchain_db &self.right().blockchain_db
} }
@ -819,9 +836,14 @@ pub struct WithMeteredProvider<DB> {
/// Helper container to bundle the [`ProviderFactory`], [`BlockchainProvider`] /// Helper container to bundle the [`ProviderFactory`], [`BlockchainProvider`]
/// and a metrics sender. /// and a metrics sender.
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct WithMeteredProviders<DB, T> { pub struct WithMeteredProviders<DB, T>
where
DB: Database,
T: FullNodeTypes,
T::Provider: FullProvider<DB>,
{
db_provider_container: WithMeteredProvider<DB>, db_provider_container: WithMeteredProvider<DB>,
blockchain_db: BlockchainProvider<DB>, blockchain_db: T::Provider,
canon_state_notification_sender: CanonStateNotificationSender, canon_state_notification_sender: CanonStateNotificationSender,
tree_config: BlockchainTreeConfig, tree_config: BlockchainTreeConfig,
// this field is used to store a reference to the FullNodeTypes so that we // this field is used to store a reference to the FullNodeTypes so that we
@ -833,12 +855,14 @@ pub struct WithMeteredProviders<DB, T> {
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct WithComponents<DB, T, CB> pub struct WithComponents<DB, T, CB>
where where
T: FullNodeTypes<Provider = BlockchainProvider<DB>>, DB: Database,
T: FullNodeTypes,
T::Provider: FullProvider<DB>,
CB: NodeComponentsBuilder<T>, CB: NodeComponentsBuilder<T>,
{ {
db_provider_container: WithMeteredProvider<DB>, db_provider_container: WithMeteredProvider<DB>,
tree_config: BlockchainTreeConfig, tree_config: BlockchainTreeConfig,
blockchain_db: BlockchainProvider<DB>, blockchain_db: T::Provider,
node_adapter: NodeAdapter<T, CB::Components>, node_adapter: NodeAdapter<T, CB::Components>,
head: Head, head: Head,
consensus: Arc<dyn Consensus>, consensus: Arc<dyn Consensus>,

View File

@ -13,6 +13,7 @@ use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook, StaticFileHook}, hooks::{EngineHooks, PruneHook, StaticFileHook},
BeaconConsensusEngine, BeaconConsensusEngine,
}; };
use reth_blockchain_tree::{noop::NoopBlockchainTree, BlockchainTreeConfig};
use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider}; use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider};
use reth_engine_util::EngineMessageStreamExt; use reth_engine_util::EngineMessageStreamExt;
use reth_exex::ExExManagerHandle; use reth_exex::ExExManagerHandle;
@ -119,6 +120,19 @@ where
} = target; } = target;
let NodeHooks { on_component_initialized, on_node_started, .. } = hooks; let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
// TODO: remove tree and move tree_config and canon_state_notification_sender
// initialization to with_blockchain_db once the engine revamp is done
// https://github.com/paradigmxyz/reth/issues/8742
let tree_config = BlockchainTreeConfig::default();
// NOTE: This is a temporary workaround to provide the canon state notification sender to the components builder because there's a cyclic dependency between the blockchain provider and the tree component. This will be removed once the Blockchain provider no longer depends on an instance of the tree: <https://github.com/paradigmxyz/reth/issues/7154>
let (canon_state_notification_sender, _receiver) =
tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2);
let tree = Arc::new(NoopBlockchainTree::with_canon_state_notifications(
canon_state_notification_sender.clone(),
));
// setup the launch context // setup the launch context
let ctx = ctx let ctx = ctx
.with_configured_globals() .with_configured_globals()
@ -146,7 +160,9 @@ where
.with_metrics_task() .with_metrics_task()
// passing FullNodeTypes as type parameter here so that we can build // passing FullNodeTypes as type parameter here so that we can build
// later the components. // later the components.
.with_blockchain_db::<T>()? .with_blockchain_db::<T, _>(move |provider_factory| {
Ok(BlockchainProvider::new(provider_factory, tree)?)
}, tree_config, canon_state_notification_sender)?
.with_components(components_builder, on_component_initialized).await?; .with_components(components_builder, on_component_initialized).await?;
// spawn exexs // spawn exexs