feat: integrate Node traits into LaunchContextWith (#8993)

This commit is contained in:
Federico Gimenez
2024-06-24 12:21:52 +02:00
committed by GitHub
parent bd0f676d06
commit 08b1e88272
2 changed files with 240 additions and 123 deletions

View File

@ -1,18 +1,28 @@
//! Helper types that can be used by launchers.
use crate::{
components::{NodeComponents, NodeComponentsBuilder},
hooks::OnComponentInitializedHook,
BuilderContext, NodeAdapter,
};
use backon::{ConstantBuilder, Retryable};
use eyre::Context;
use rayon::ThreadPoolBuilder;
use reth_auto_seal_consensus::MiningMode;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_blockchain_tree::{noop::NoopBlockchainTree, BlockchainTreeConfig};
use reth_blockchain_tree::{
noop::NoopBlockchainTree, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree,
TreeExternals,
};
use reth_chainspec::{Chain, ChainSpec};
use reth_config::{config::EtlConfig, PruneConfig};
use reth_consensus::Consensus;
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
use reth_db_common::init::{init_genesis, InitDatabaseError};
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
use reth_evm::noop::NoopBlockExecutorProvider;
use reth_network_p2p::headers::client::HeadersClient;
use reth_node_api::FullNodeTypes;
use reth_node_core::{
dirs::{ChainPath, DataDirPath},
node_config::NodeConfig,
@ -29,7 +39,7 @@ use reth_stages::{sets::DefaultStages, MetricEvent, Pipeline, PipelineTarget};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, error, info, warn};
use std::{sync::Arc, thread::available_parallelism};
use std::{marker::PhantomData, sync::Arc, thread::available_parallelism};
use tokio::sync::{
mpsc::{unbounded_channel, Receiver, UnboundedSender},
oneshot, watch,
@ -509,9 +519,12 @@ where
}
/// Creates a `BlockchainProvider` and attaches it to the launch context.
pub async fn with_blockchain_db(
pub async fn with_blockchain_db<T>(
self,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB>>>> {
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB, T>>>>
where
T: FullNodeTypes<Provider = BlockchainProvider<<T as FullNodeTypes>::DB>>,
{
let tree_config = BlockchainTreeConfig::default();
// NOTE: This is a temporary workaround to provide the canon state notification sender to the components builder because there's a cyclic dependency between the blockchain provider and the tree component. This will be removed once the Blockchain provider no longer depends on an instance of the tree: <https://github.com/paradigmxyz/reth/issues/7154>
@ -526,11 +539,15 @@ where
)?;
let metered_providers = WithMeteredProviders {
provider_factory: self.provider_factory().clone(),
db_provider_container: WithMeteredProvider {
provider_factory: self.provider_factory().clone(),
metrics_sender: self.sync_metrics_tx(),
},
blockchain_db,
metrics_sender: self.sync_metrics_tx(),
tree_config,
canon_state_notification_sender,
// we store here a reference to T.
phantom_data: PhantomData,
};
let ctx = LaunchContextWith {
@ -542,9 +559,10 @@ where
}
}
impl<DB> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB>>>
impl<DB, T> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB, T>>>
where
DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static,
T: FullNodeTypes<Provider = BlockchainProvider<DB>>,
{
/// Returns access to the underlying database.
pub fn database(&self) -> &DB {
@ -553,29 +571,7 @@ where
/// Returns the configured `ProviderFactory`.
pub const fn provider_factory(&self) -> &ProviderFactory<DB> {
&self.right().provider_factory
}
/// Returns the static file provider to interact with the static files.
pub fn static_file_provider(&self) -> StaticFileProvider {
self.provider_factory().static_file_provider()
}
/// Creates a new [`StaticFileProducer`] with the attached database.
pub fn static_file_producer(&self) -> StaticFileProducer<DB> {
StaticFileProducer::new(
self.provider_factory().clone(),
self.prune_modes().unwrap_or_default(),
)
}
/// Returns the max block that the node should run to, looking it up from the network if
/// necessary
pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
where
C: HeadersClient,
{
self.node_config().max_block(client, self.provider_factory().clone()).await
&self.right().db_provider_container.provider_factory
}
/// Fetches the head block from the database.
@ -589,7 +585,7 @@ where
/// Returns the metrics sender.
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().metrics_sender.clone()
self.right().db_provider_container.metrics_sender.clone()
}
/// Returns a reference to the `BlockchainProvider`.
@ -606,6 +602,149 @@ where
pub fn canon_state_notification_sender(&self) -> CanonStateNotificationSender {
self.right().canon_state_notification_sender.clone()
}
/// Creates a `NodeAdapter` and attaches it to the launch context.
pub async fn with_components<CB>(
self,
components_builder: CB,
on_component_initialized: Box<
dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
>,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, WithComponents<DB, T, CB>>>>
where
CB: NodeComponentsBuilder<T>,
{
// fetch the head block from the database
let head = self.lookup_head()?;
let builder_ctx = BuilderContext::new(
head,
self.blockchain_db().clone(),
self.task_executor().clone(),
self.configs().clone(),
);
debug!(target: "reth::cli", "creating components");
let components = components_builder.build_components(&builder_ctx).await?;
let consensus: Arc<dyn Consensus> = Arc::new(components.consensus().clone());
let tree_externals = TreeExternals::new(
self.provider_factory().clone(),
consensus.clone(),
components.block_executor().clone(),
);
let tree = BlockchainTree::new(tree_externals, *self.tree_config(), self.prune_modes())?
.with_sync_metrics_tx(self.sync_metrics_tx())
// Note: This is required because we need to ensure that both the components and the
// tree are using the same channel for canon state notifications. This will be removed
// once the Blockchain provider no longer depends on an instance of the tree
.with_canon_state_notification_sender(self.canon_state_notification_sender());
let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree));
// Replace the tree component with the actual tree
let blockchain_db = self.blockchain_db().clone().with_tree(blockchain_tree);
debug!(target: "reth::cli", "configured blockchain tree");
let node_adapter = NodeAdapter {
components,
task_executor: self.task_executor().clone(),
provider: blockchain_db.clone(),
};
debug!(target: "reth::cli", "calling on_component_initialized hook");
on_component_initialized.on_event(node_adapter.clone())?;
let components_container = WithComponents {
db_provider_container: WithMeteredProvider {
provider_factory: self.provider_factory().clone(),
metrics_sender: self.sync_metrics_tx(),
},
blockchain_db,
tree_config: self.right().tree_config,
node_adapter,
head,
consensus,
};
let ctx = LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| components_container),
};
Ok(ctx)
}
}
impl<DB, T, CB> LaunchContextWith<Attached<WithConfigs, WithComponents<DB, T, CB>>>
where
DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static,
T: FullNodeTypes<Provider = BlockchainProvider<DB>>,
CB: NodeComponentsBuilder<T>,
{
/// Returns the configured `ProviderFactory`.
pub const fn provider_factory(&self) -> &ProviderFactory<DB> {
&self.right().db_provider_container.provider_factory
}
/// Returns the max block that the node should run to, looking it up from the network if
/// necessary
pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
where
C: HeadersClient,
{
self.node_config().max_block(client, self.provider_factory().clone()).await
}
/// Returns the static file provider to interact with the static files.
pub fn static_file_provider(&self) -> StaticFileProvider {
self.provider_factory().static_file_provider()
}
/// Creates a new [`StaticFileProducer`] with the attached database.
pub fn static_file_producer(&self) -> StaticFileProducer<DB> {
StaticFileProducer::new(
self.provider_factory().clone(),
self.prune_modes().unwrap_or_default(),
)
}
/// Returns the current head block.
pub const fn head(&self) -> Head {
self.right().head
}
/// Returns the configured `NodeAdapter`.
pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
&self.right().node_adapter
}
/// Returns a reference to the `BlockchainProvider`.
pub const fn blockchain_db(&self) -> &BlockchainProvider<DB> {
&self.right().blockchain_db
}
/// Returns the configured `Consensus`.
pub fn consensus(&self) -> Arc<dyn Consensus> {
self.right().consensus.clone()
}
/// Returns the metrics sender.
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().db_provider_container.metrics_sender.clone()
}
/// Returns a reference to the `BlockchainTreeConfig`.
pub const fn tree_config(&self) -> &BlockchainTreeConfig {
&self.right().tree_config
}
/// Returns the node adapter components.
pub const fn components(&self) -> &CB::Components {
&self.node_adapter().components
}
}
/// Joins two attachments together.
@ -668,25 +807,42 @@ pub struct WithConfigs {
pub toml_config: reth_config::Config,
}
/// Helper container to bundle the [`ProviderFactory`], [`BlockchainProvider`]
/// and a metrics sender.
#[allow(missing_debug_implementations)]
pub struct WithMeteredProviders<DB> {
provider_factory: ProviderFactory<DB>,
blockchain_db: BlockchainProvider<DB>,
metrics_sender: UnboundedSender<MetricEvent>,
canon_state_notification_sender: CanonStateNotificationSender,
tree_config: BlockchainTreeConfig,
}
/// Helper container type to bundle athe [`ProviderFactory`] and the metrics
/// Helper container type to bundle the [`ProviderFactory`] and the metrics
/// sender.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct WithMeteredProvider<DB> {
provider_factory: ProviderFactory<DB>,
metrics_sender: UnboundedSender<MetricEvent>,
}
/// Helper container to bundle the [`ProviderFactory`], [`BlockchainProvider`]
/// and a metrics sender.
#[allow(missing_debug_implementations)]
pub struct WithMeteredProviders<DB, T> {
db_provider_container: WithMeteredProvider<DB>,
blockchain_db: BlockchainProvider<DB>,
canon_state_notification_sender: CanonStateNotificationSender,
tree_config: BlockchainTreeConfig,
// this field is used to store a reference to the FullNodeTypes so that we
// can build the components in `with_components` method.
phantom_data: PhantomData<T>,
}
/// Helper container to bundle the metered providers container and [`NodeAdapter`].
#[allow(missing_debug_implementations)]
pub struct WithComponents<DB, T, CB>
where
T: FullNodeTypes<Provider = BlockchainProvider<DB>>,
CB: NodeComponentsBuilder<T>,
{
db_provider_container: WithMeteredProvider<DB>,
tree_config: BlockchainTreeConfig,
blockchain_db: BlockchainProvider<DB>,
node_adapter: NodeAdapter<T, CB::Components>,
head: Head,
consensus: Arc<dyn Consensus>,
}
#[cfg(test)]
mod tests {
use super::{LaunchContext, NodeConfig};

View File

@ -5,19 +5,17 @@ use crate::{
components::{NodeComponents, NodeComponentsBuilder},
hooks::NodeHooks,
node::FullNode,
BuilderContext, NodeBuilderWithComponents, NodeHandle,
NodeBuilderWithComponents, NodeHandle,
};
use futures::{future::Either, stream, stream_select, StreamExt};
use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook, StaticFileHook},
BeaconConsensusEngine,
};
use reth_blockchain_tree::{BlockchainTree, ShareableBlockchainTree, TreeExternals};
use reth_consensus::Consensus;
use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider};
use reth_exex::ExExManagerHandle;
use reth_network::NetworkEvents;
use reth_node_api::{FullNodeComponents, FullNodeTypes};
use reth_node_api::FullNodeTypes;
use reth_node_core::{
dirs::{ChainPath, DataDirPath},
engine::EngineMessageStreamExt,
@ -101,6 +99,7 @@ where
add_ons: NodeAddOns { hooks, rpc, exexs: installed_exex },
config,
} = target;
let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
// setup the launch context
let ctx = ctx
@ -127,61 +126,23 @@ where
info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
})
.with_metrics()
.with_blockchain_db().await?;
// fetch the head block from the database
let head = ctx.lookup_head()?;
let builder_ctx = BuilderContext::new(
head,
ctx.blockchain_db().clone(),
ctx.task_executor().clone(),
ctx.configs().clone(),
);
debug!(target: "reth::cli", "creating components");
let components = components_builder.build_components(&builder_ctx).await?;
let consensus: Arc<dyn Consensus> = Arc::new(components.consensus().clone());
let tree_externals = TreeExternals::new(
ctx.provider_factory().clone(),
consensus.clone(),
components.block_executor().clone(),
);
let tree = BlockchainTree::new(tree_externals, *ctx.tree_config(), ctx.prune_modes())?
.with_sync_metrics_tx(ctx.sync_metrics_tx())
// Note: This is required because we need to ensure that both the components and the
// tree are using the same channel for canon state notifications. This will be removed
// once the Blockchain provider no longer depends on an instance of the tree
.with_canon_state_notification_sender(ctx.canon_state_notification_sender());
let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree));
// Replace the tree component with the actual tree
let blockchain_db = ctx.blockchain_db().clone().with_tree(blockchain_tree);
debug!(target: "reth::cli", "configured blockchain tree");
let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
let node_adapter = NodeAdapter {
components,
task_executor: ctx.task_executor().clone(),
provider: blockchain_db.clone(),
};
debug!(target: "reth::cli", "calling on_component_initialized hook");
on_component_initialized.on_event(node_adapter.clone())?;
// passing FullNodeTypes as type parameter here so that we can build
// later the components.
.with_blockchain_db::<T>().await?
.with_components(components_builder, on_component_initialized).await?;
// spawn exexs
let exex_manager_handle =
ExExLauncher::new(head, node_adapter.clone(), installed_exex, ctx.configs().clone())
.launch()
.await;
let exex_manager_handle = ExExLauncher::new(
ctx.head(),
ctx.node_adapter().clone(),
installed_exex,
ctx.configs().clone(),
)
.launch()
.await;
// create pipeline
let network_client = node_adapter.network().fetch_client().await?;
let network_client = ctx.components().network().fetch_client().await?;
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
let node_config = ctx.node_config();
@ -216,30 +177,30 @@ where
// install auto-seal
let mining_mode =
ctx.dev_mining_mode(node_adapter.components.pool().pending_transactions_listener());
ctx.dev_mining_mode(ctx.components().pool().pending_transactions_listener());
info!(target: "reth::cli", mode=%mining_mode, "configuring dev mining mode");
let (_, client, mut task) = reth_auto_seal_consensus::AutoSealBuilder::new(
ctx.chain_spec(),
blockchain_db.clone(),
node_adapter.components.pool().clone(),
ctx.blockchain_db().clone(),
ctx.components().pool().clone(),
consensus_engine_tx.clone(),
mining_mode,
node_adapter.components.block_executor().clone(),
ctx.components().block_executor().clone(),
)
.build();
let pipeline = crate::setup::build_networked_pipeline(
&ctx.toml_config().stages,
client.clone(),
consensus.clone(),
ctx.consensus(),
ctx.provider_factory().clone(),
ctx.task_executor(),
ctx.sync_metrics_tx(),
ctx.prune_config(),
max_block,
static_file_producer,
node_adapter.components.block_executor().clone(),
ctx.components().block_executor().clone(),
pipeline_exex_handle,
)
.await?;
@ -254,14 +215,14 @@ where
let pipeline = crate::setup::build_networked_pipeline(
&ctx.toml_config().stages,
network_client.clone(),
consensus.clone(),
ctx.consensus(),
ctx.provider_factory().clone(),
ctx.task_executor(),
ctx.sync_metrics_tx(),
ctx.prune_config(),
max_block,
static_file_producer,
node_adapter.components.block_executor().clone(),
ctx.components().block_executor().clone(),
pipeline_exex_handle,
)
.await?;
@ -290,11 +251,11 @@ where
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
client,
pipeline,
blockchain_db.clone(),
ctx.blockchain_db().clone(),
Box::new(ctx.task_executor().clone()),
Box::new(node_adapter.components.network().clone()),
Box::new(ctx.components().network().clone()),
max_block,
node_adapter.components.payload_builder().clone(),
ctx.components().payload_builder().clone(),
initial_target,
reth_beacon_consensus::MIN_BLOCKS_FOR_PIPELINE_RUN,
consensus_engine_tx,
@ -304,12 +265,12 @@ where
info!(target: "reth::cli", "Consensus engine initialized");
let events = stream_select!(
node_adapter.components.network().event_listener().map(Into::into),
ctx.components().network().event_listener().map(Into::into),
beacon_engine_handle.event_listener().map(Into::into),
pipeline_events.map(Into::into),
if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
Either::Left(
ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone()))
ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
.map(Into::into),
)
} else {
@ -321,8 +282,8 @@ where
ctx.task_executor().spawn_critical(
"events task",
node::handle_events(
Some(node_adapter.components.network().clone()),
Some(head.number),
Some(ctx.components().network().clone()),
Some(ctx.head().number),
events,
database.clone(),
),
@ -335,10 +296,10 @@ where
commit: VERGEN_GIT_SHA.to_string(),
};
let engine_api = EngineApi::new(
blockchain_db.clone(),
ctx.blockchain_db().clone(),
ctx.chain_spec(),
beacon_engine_handle,
node_adapter.components.payload_builder().clone().into(),
ctx.components().payload_builder().clone().into(),
Box::new(ctx.task_executor().clone()),
client,
);
@ -349,7 +310,7 @@ where
// Start RPC servers
let (rpc_server_handles, mut rpc_registry) = crate::rpc::launch_rpc_servers(
node_adapter.clone(),
ctx.node_adapter().clone(),
engine_api,
ctx.node_config(),
jwt_secret,
@ -413,12 +374,12 @@ where
}
let full_node = FullNode {
evm_config: node_adapter.components.evm_config().clone(),
block_executor: node_adapter.components.block_executor().clone(),
pool: node_adapter.components.pool().clone(),
network: node_adapter.components.network().clone(),
provider: node_adapter.provider.clone(),
payload_builder: node_adapter.components.payload_builder().clone(),
evm_config: ctx.components().evm_config().clone(),
block_executor: ctx.components().block_executor().clone(),
pool: ctx.components().pool().clone(),
network: ctx.components().network().clone(),
provider: ctx.node_adapter().provider.clone(),
payload_builder: ctx.components().payload_builder().clone(),
task_executor: ctx.task_executor().clone(),
rpc_server_handles,
rpc_registry,