feat: extend LaunchContext with more components (#8937)

This commit is contained in:
Federico Gimenez
2024-06-18 18:47:28 +02:00
committed by GitHub
parent 8d4cf43d6e
commit 655799f5bf
2 changed files with 175 additions and 63 deletions

View File

@ -5,6 +5,7 @@ use eyre::Context;
use rayon::ThreadPoolBuilder;
use reth_auto_seal_consensus::MiningMode;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_blockchain_tree::{noop::NoopBlockchainTree, BlockchainTreeConfig};
use reth_chainspec::{Chain, ChainSpec};
use reth_config::{config::EtlConfig, PruneConfig};
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
@ -17,16 +18,22 @@ use reth_node_core::{
node_config::NodeConfig,
};
use reth_primitives::{BlockNumber, Head, B256};
use reth_provider::{providers::StaticFileProvider, ProviderFactory, StaticFileProviderFactory};
use reth_provider::{
providers::{BlockchainProvider, StaticFileProvider},
CanonStateNotificationSender, ProviderFactory, StaticFileProviderFactory,
};
use reth_prune::{PruneModes, PrunerBuilder};
use reth_rpc_builder::config::RethRpcServerConfig;
use reth_rpc_layer::JwtSecret;
use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget};
use reth_stages::{sets::DefaultStages, MetricEvent, Pipeline, PipelineTarget};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, error, info, warn};
use std::{sync::Arc, thread::available_parallelism};
use tokio::sync::{mpsc::Receiver, oneshot, watch};
use tokio::sync::{
mpsc::{unbounded_channel, Receiver, UnboundedSender},
oneshot, watch,
};
/// Reusable setup for launching a node.
///
@ -438,34 +445,6 @@ where
self.right().static_file_provider()
}
/// Creates a new [`StaticFileProducer`] with the attached database.
pub fn static_file_producer(&self) -> StaticFileProducer<DB> {
StaticFileProducer::new(
self.provider_factory().clone(),
self.prune_modes().unwrap_or_default(),
)
}
/// Convenience function to [`Self::init_genesis`]
pub fn with_genesis(self) -> Result<Self, InitDatabaseError> {
init_genesis(self.provider_factory().clone())?;
Ok(self)
}
/// Write the genesis block and state if it has not already been written
pub fn init_genesis(&self) -> Result<B256, InitDatabaseError> {
init_genesis(self.provider_factory().clone())
}
/// Returns the max block that the node should run to, looking it up from the network if
/// necessary
pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
where
C: HeadersClient,
{
self.node_config().max_block(client, self.provider_factory().clone()).await
}
/// Convenience function to [`Self::start_prometheus_endpoint`]
pub async fn with_prometheus(self) -> eyre::Result<Self> {
self.start_prometheus_endpoint().await?;
@ -485,6 +464,120 @@ where
.await
}
/// Convenience function to [`Self::init_genesis`]
pub fn with_genesis(self) -> Result<Self, InitDatabaseError> {
init_genesis(self.provider_factory().clone())?;
Ok(self)
}
/// Write the genesis block and state if it has not already been written
pub fn init_genesis(&self) -> Result<B256, InitDatabaseError> {
init_genesis(self.provider_factory().clone())
}
/// Creates a new `WithMeteredProvider` container and attaches it to the
/// launch context.
pub fn with_metrics(self) -> LaunchContextWith<Attached<WithConfigs, WithMeteredProvider<DB>>> {
let (metrics_sender, metrics_receiver) = unbounded_channel();
let with_metrics =
WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
debug!(target: "reth::cli", "Spawning stages metrics listener task");
let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| with_metrics),
}
}
}
impl<DB> LaunchContextWith<Attached<WithConfigs, WithMeteredProvider<DB>>>
where
DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static,
{
/// Returns the configured `ProviderFactory`.
const fn provider_factory(&self) -> &ProviderFactory<DB> {
&self.right().provider_factory
}
/// Returns the metrics sender.
fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().metrics_sender.clone()
}
/// Creates a `BlockchainProvider` and attaches it to the launch context.
pub async fn with_blockchain_db(
self,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB>>>> {
let tree_config = BlockchainTreeConfig::default();
// NOTE: This is a temporary workaround to provide the canon state notification sender to the components builder because there's a cyclic dependency between the blockchain provider and the tree component. This will be removed once the Blockchain provider no longer depends on an instance of the tree: <https://github.com/paradigmxyz/reth/issues/7154>
let (canon_state_notification_sender, _receiver) =
tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2);
let blockchain_db = BlockchainProvider::new(
self.provider_factory().clone(),
Arc::new(NoopBlockchainTree::with_canon_state_notifications(
canon_state_notification_sender.clone(),
)),
)?;
let metered_providers = WithMeteredProviders {
provider_factory: self.provider_factory().clone(),
blockchain_db,
metrics_sender: self.sync_metrics_tx(),
tree_config,
canon_state_notification_sender,
};
let ctx = LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| metered_providers),
};
Ok(ctx)
}
}
impl<DB> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB>>>
where
DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static,
{
/// Returns access to the underlying database.
pub fn database(&self) -> &DB {
self.provider_factory().db_ref()
}
/// Returns the configured `ProviderFactory`.
pub const fn provider_factory(&self) -> &ProviderFactory<DB> {
&self.right().provider_factory
}
/// Returns the static file provider to interact with the static files.
pub fn static_file_provider(&self) -> StaticFileProvider {
self.provider_factory().static_file_provider()
}
/// Creates a new [`StaticFileProducer`] with the attached database.
pub fn static_file_producer(&self) -> StaticFileProducer<DB> {
StaticFileProducer::new(
self.provider_factory().clone(),
self.prune_modes().unwrap_or_default(),
)
}
/// Returns the max block that the node should run to, looking it up from the network if
/// necessary
pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
where
C: HeadersClient,
{
self.node_config().max_block(client, self.provider_factory().clone()).await
}
/// Fetches the head block from the database.
///
/// If the database is empty, returns the genesis block.
@ -493,6 +586,26 @@ where
.lookup_head(self.provider_factory().clone())
.wrap_err("the head block is missing")
}
/// Returns the metrics sender.
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().metrics_sender.clone()
}
/// Returns a reference to the `BlockchainProvider`.
pub const fn blockchain_db(&self) -> &BlockchainProvider<DB> {
&self.right().blockchain_db
}
/// Returns a reference to the `BlockchainTreeConfig`.
pub const fn tree_config(&self) -> &BlockchainTreeConfig {
&self.right().tree_config
}
/// Returns the `CanonStateNotificationSender`.
pub fn canon_state_notification_sender(&self) -> CanonStateNotificationSender {
self.right().canon_state_notification_sender.clone()
}
}
/// Joins two attachments together.
@ -555,6 +668,25 @@ pub struct WithConfigs {
pub toml_config: reth_config::Config,
}
/// Helper container to bundle the [`ProviderFactory`], [`BlockchainProvider`]
/// and a metrics sender.
#[allow(missing_debug_implementations)]
pub struct WithMeteredProviders<DB> {
provider_factory: ProviderFactory<DB>,
blockchain_db: BlockchainProvider<DB>,
metrics_sender: UnboundedSender<MetricEvent>,
canon_state_notification_sender: CanonStateNotificationSender,
tree_config: BlockchainTreeConfig,
}
/// Helper container type to bundle athe [`ProviderFactory`] and the metrics
/// sender.
#[derive(Debug)]
pub struct WithMeteredProvider<DB> {
provider_factory: ProviderFactory<DB>,
metrics_sender: UnboundedSender<MetricEvent>,
}
#[cfg(test)]
mod tests {
use super::{LaunchContext, NodeConfig};

View File

@ -12,10 +12,7 @@ use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook, StaticFileHook},
BeaconConsensusEngine,
};
use reth_blockchain_tree::{
noop::NoopBlockchainTree, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree,
TreeExternals,
};
use reth_blockchain_tree::{BlockchainTree, ShareableBlockchainTree, TreeExternals};
use reth_consensus::Consensus;
use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider};
use reth_exex::ExExManagerHandle;
@ -128,33 +125,16 @@ where
.with_genesis()?
.inspect(|this| {
info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
});
debug!(target: "reth::cli", "Spawning stages metrics listener task");
let (sync_metrics_tx, sync_metrics_rx) = unbounded_channel();
let sync_metrics_listener = reth_stages::MetricsListener::new(sync_metrics_rx);
ctx.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
})
.with_metrics()
.with_blockchain_db().await?;
// fetch the head block from the database
let head = ctx.lookup_head()?;
// Configure the blockchain tree for the node
let tree_config = BlockchainTreeConfig::default();
// NOTE: This is a temporary workaround to provide the canon state notification sender to the components builder because there's a cyclic dependency between the blockchain provider and the tree component. This will be removed once the Blockchain provider no longer depends on an instance of the tree: <https://github.com/paradigmxyz/reth/issues/7154>
let (canon_state_notification_sender, _receiver) =
tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2);
let blockchain_db = BlockchainProvider::new(
ctx.provider_factory().clone(),
Arc::new(NoopBlockchainTree::with_canon_state_notifications(
canon_state_notification_sender.clone(),
)),
)?;
let builder_ctx = BuilderContext::new(
head,
blockchain_db.clone(),
ctx.blockchain_db().clone(),
ctx.task_executor().clone(),
ctx.configs().clone(),
);
@ -169,17 +149,17 @@ where
consensus.clone(),
components.block_executor().clone(),
);
let tree = BlockchainTree::new(tree_externals, tree_config, ctx.prune_modes())?
.with_sync_metrics_tx(sync_metrics_tx.clone())
let tree = BlockchainTree::new(tree_externals, *ctx.tree_config(), ctx.prune_modes())?
.with_sync_metrics_tx(ctx.sync_metrics_tx())
// Note: This is required because we need to ensure that both the components and the
// tree are using the same channel for canon state notifications. This will be removed
// once the Blockchain provider no longer depends on an instance of the tree
.with_canon_state_notification_sender(canon_state_notification_sender);
.with_canon_state_notification_sender(ctx.canon_state_notification_sender());
let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree));
// Replace the tree component with the actual tree
let blockchain_db = blockchain_db.with_tree(blockchain_tree);
let blockchain_db = ctx.blockchain_db().clone().with_tree(blockchain_tree);
debug!(target: "reth::cli", "configured blockchain tree");
@ -255,7 +235,7 @@ where
consensus.clone(),
ctx.provider_factory().clone(),
ctx.task_executor(),
sync_metrics_tx,
ctx.sync_metrics_tx(),
ctx.prune_config(),
max_block,
static_file_producer,
@ -277,7 +257,7 @@ where
consensus.clone(),
ctx.provider_factory().clone(),
ctx.task_executor(),
sync_metrics_tx,
ctx.sync_metrics_tx(),
ctx.prune_config(),
max_block,
static_file_producer,
@ -294,7 +274,7 @@ where
let initial_target = ctx.node_config().debug.tip;
let mut pruner_builder =
ctx.pruner_builder().max_reorg_depth(tree_config.max_reorg_depth() as usize);
ctx.pruner_builder().max_reorg_depth(ctx.tree_config().max_reorg_depth() as usize);
if let Some(exex_manager_handle) = &exex_manager_handle {
pruner_builder =
pruner_builder.finished_exex_height(exex_manager_handle.finished_height());