feat: add ExExLauncher (#8693)

This commit is contained in:
Matthias Seitz
2024-06-10 13:14:00 +02:00
committed by GitHub
parent 53f4ecade4
commit ee7a829a12
4 changed files with 142 additions and 81 deletions

View File

@ -5,7 +5,7 @@ use reth_node_api::FullNodeComponents;
use std::future::Future; use std::future::Future;
/// A trait for launching an `ExEx`. /// A trait for launching an `ExEx`.
trait LaunchExEx<Node: FullNodeComponents>: Send { pub trait LaunchExEx<Node: FullNodeComponents>: Send {
/// Launches the `ExEx`. /// Launches the `ExEx`.
/// ///
/// The `ExEx` should be able to run independently and emit events on the channels provided in /// The `ExEx` should be able to run independently and emit events on the channels provided in
@ -16,10 +16,12 @@ trait LaunchExEx<Node: FullNodeComponents>: Send {
) -> impl Future<Output = eyre::Result<impl Future<Output = eyre::Result<()>> + Send>> + Send; ) -> impl Future<Output = eyre::Result<impl Future<Output = eyre::Result<()>> + Send>> + Send;
} }
type BoxExEx = BoxFuture<'static, eyre::Result<()>>; /// A boxed exex future.
pub type BoxExEx = BoxFuture<'static, eyre::Result<()>>;
/// A version of [`LaunchExEx`] that returns a boxed future. Makes the trait object-safe. /// A version of [`LaunchExEx`] that returns a boxed future. Makes the trait object-safe.
pub(crate) trait BoxedLaunchExEx<Node: FullNodeComponents>: Send { pub trait BoxedLaunchExEx<Node: FullNodeComponents>: Send {
/// Launches the `ExEx` and returns a boxed future.
fn launch(self: Box<Self>, ctx: ExExContext<Node>) fn launch(self: Box<Self>, ctx: ExExContext<Node>)
-> BoxFuture<'static, eyre::Result<BoxExEx>>; -> BoxFuture<'static, eyre::Result<BoxExEx>>;
} }

View File

@ -254,6 +254,11 @@ impl<R> LaunchContextWith<Attached<WithConfigs, R>> {
self self
} }
/// Returns the container for all config types
pub const fn configs(&self) -> &WithConfigs {
self.attachment.left()
}
/// Returns the attached [`NodeConfig`]. /// Returns the attached [`NodeConfig`].
pub const fn node_config(&self) -> &NodeConfig { pub const fn node_config(&self) -> &NodeConfig {
&self.left().config &self.left().config

View File

@ -0,0 +1,122 @@
//! Support for launching execution extensions.
use crate::{common::WithConfigs, exex::BoxedLaunchExEx};
use futures::future;
use reth_exex::{ExExContext, ExExHandle, ExExManager, ExExManagerHandle};
use reth_node_api::FullNodeComponents;
use reth_primitives::Head;
use reth_provider::CanonStateSubscriptions;
use reth_tracing::tracing::{debug, info};
use std::{fmt, fmt::Debug};
/// Can launch execution extensions.
pub struct ExExLauncher<Node: FullNodeComponents> {
head: Head,
extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
components: Node,
config_container: WithConfigs,
}
impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
/// Create a new `ExExLauncher` with the given extensions.
pub const fn new(
head: Head,
components: Node,
extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
config_container: WithConfigs,
) -> Self {
Self { head, extensions, components, config_container }
}
/// Launches all execution extensions.
///
/// Spawns all extensions and returns the handle to the exex manager if any extensions are
/// installed.
pub async fn launch(self) -> Option<ExExManagerHandle> {
let Self { head, extensions, components, config_container } = self;
if extensions.is_empty() {
// nothing to launch
return None
}
let mut exex_handles = Vec::with_capacity(extensions.len());
let mut exexs = Vec::with_capacity(extensions.len());
for (id, exex) in extensions {
// create a new exex handle
let (handle, events, notifications) = ExExHandle::new(id.clone());
exex_handles.push(handle);
// create the launch context for the exex
let context = ExExContext {
head,
config: config_container.config.clone(),
reth_config: config_container.toml_config.clone(),
components: components.clone(),
events,
notifications,
};
let executor = components.task_executor().clone();
exexs.push(async move {
debug!(target: "reth::cli", id, "spawning exex");
let span = reth_tracing::tracing::info_span!("exex", id);
let _enter = span.enter();
// init the exex
let exex = exex.launch(context).await.unwrap();
// spawn it as a crit task
executor.spawn_critical("exex", async move {
info!(target: "reth::cli", "ExEx started");
match exex.await {
Ok(_) => panic!("ExEx {id} finished. ExEx's should run indefinitely"),
Err(err) => panic!("ExEx {id} crashed: {err}"),
}
});
});
}
future::join_all(exexs).await;
// spawn exex manager
debug!(target: "reth::cli", "spawning exex manager");
// todo(onbjerg): rm magic number
let exex_manager = ExExManager::new(exex_handles, 1024);
let exex_manager_handle = exex_manager.handle();
components.task_executor().spawn_critical("exex manager", async move {
exex_manager.await.expect("exex manager crashed");
});
// send notifications from the blockchain tree to exex manager
let mut canon_state_notifications = components.provider().subscribe_to_canonical_state();
let mut handle = exex_manager_handle.clone();
components.task_executor().spawn_critical(
"exex manager blockchain tree notifications",
async move {
while let Ok(notification) = canon_state_notifications.recv().await {
handle
.send_async(notification.into())
.await
.expect("blockchain tree notification could not be sent to exex manager");
}
},
);
info!(target: "reth::cli", "ExEx Manager started");
Some(exex_manager_handle)
}
}
impl<Node: FullNodeComponents> Debug for ExExLauncher<Node> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ExExLauncher")
.field("head", &self.head)
.field("extensions", &self.extensions.iter().map(|(id, _)| id).collect::<Vec<_>>())
.field("components", &"...")
.field("config_container", &self.config_container)
.finish()
}
}

View File

@ -4,11 +4,10 @@ use crate::{
builder::{NodeAdapter, NodeAddOns, NodeTypesAdapter}, builder::{NodeAdapter, NodeAddOns, NodeTypesAdapter},
components::{NodeComponents, NodeComponentsBuilder}, components::{NodeComponents, NodeComponentsBuilder},
hooks::NodeHooks, hooks::NodeHooks,
launch::common::WithConfigs,
node::FullNode, node::FullNode,
BuilderContext, NodeBuilderWithComponents, NodeHandle, BuilderContext, NodeBuilderWithComponents, NodeHandle,
}; };
use futures::{future, future::Either, stream, stream_select, StreamExt}; use futures::{future::Either, stream, stream_select, StreamExt};
use reth_auto_seal_consensus::AutoSealConsensus; use reth_auto_seal_consensus::AutoSealConsensus;
use reth_beacon_consensus::{ use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook, StaticFileHook}, hooks::{EngineHooks, PruneHook, StaticFileHook},
@ -19,7 +18,7 @@ use reth_blockchain_tree::{
TreeExternals, TreeExternals,
}; };
use reth_consensus::Consensus; use reth_consensus::Consensus;
use reth_exex::{ExExContext, ExExHandle, ExExManager, ExExManagerHandle}; use reth_exex::ExExManagerHandle;
use reth_network::NetworkEvents; use reth_network::NetworkEvents;
use reth_node_api::{FullNodeComponents, FullNodeTypes}; use reth_node_api::{FullNodeComponents, FullNodeTypes};
use reth_node_core::{ use reth_node_core::{
@ -30,7 +29,7 @@ use reth_node_core::{
}; };
use reth_node_events::{cl::ConsensusLayerHealthEvents, node}; use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
use reth_primitives::format_ether; use reth_primitives::format_ether;
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions}; use reth_provider::providers::BlockchainProvider;
use reth_rpc_engine_api::EngineApi; use reth_rpc_engine_api::EngineApi;
use reth_rpc_types::engine::ClientVersionV1; use reth_rpc_types::engine::ClientVersionV1;
use reth_tasks::TaskExecutor; use reth_tasks::TaskExecutor;
@ -42,6 +41,8 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
pub mod common; pub mod common;
pub use common::LaunchContext; pub use common::LaunchContext;
mod exex;
pub use exex::ExExLauncher;
/// A general purpose trait that launches a new node of any kind. /// A general purpose trait that launches a new node of any kind.
/// ///
@ -157,15 +158,11 @@ where
)), )),
)?; )?;
let config_container = WithConfigs {
config: ctx.node_config().clone(),
toml_config: ctx.toml_config().clone(),
};
let builder_ctx = BuilderContext::new( let builder_ctx = BuilderContext::new(
head, head,
blockchain_db.clone(), blockchain_db.clone(),
ctx.task_executor().clone(), ctx.task_executor().clone(),
config_container, ctx.configs().clone(),
); );
debug!(target: "reth::cli", "creating components"); debug!(target: "reth::cli", "creating components");
@ -202,75 +199,10 @@ where
on_component_initialized.on_event(node_adapter.clone())?; on_component_initialized.on_event(node_adapter.clone())?;
// spawn exexs // spawn exexs
let mut exex_handles = Vec::with_capacity(installed_exex.len()); let exex_manager_handle =
let mut exexs = Vec::with_capacity(installed_exex.len()); ExExLauncher::new(head, node_adapter.clone(), installed_exex, ctx.configs().clone())
for (id, exex) in installed_exex { .launch()
// create a new exex handle .await;
let (handle, events, notifications) = ExExHandle::new(id.clone());
exex_handles.push(handle);
// create the launch context for the exex
let context = ExExContext {
head,
config: ctx.node_config().clone(),
reth_config: ctx.toml_config().clone(),
components: node_adapter.clone(),
events,
notifications,
};
let executor = ctx.task_executor().clone();
exexs.push(async move {
debug!(target: "reth::cli", id, "spawning exex");
let span = reth_tracing::tracing::info_span!("exex", id);
let _enter = span.enter();
// init the exex
let exex = exex.launch(context).await.unwrap();
// spawn it as a crit task
executor.spawn_critical("exex", async move {
info!(target: "reth::cli", "ExEx started");
match exex.await {
Ok(_) => panic!("ExEx {id} finished. ExEx's should run indefinitely"),
Err(err) => panic!("ExEx {id} crashed: {err}"),
}
});
});
}
future::join_all(exexs).await;
// spawn exex manager
let exex_manager_handle = if !exex_handles.is_empty() {
debug!(target: "reth::cli", "spawning exex manager");
// todo(onbjerg): rm magic number
let exex_manager = ExExManager::new(exex_handles, 1024);
let exex_manager_handle = exex_manager.handle();
ctx.task_executor().spawn_critical("exex manager", async move {
exex_manager.await.expect("exex manager crashed");
});
// send notifications from the blockchain tree to exex manager
let mut canon_state_notifications = blockchain_db.subscribe_to_canonical_state();
let mut handle = exex_manager_handle.clone();
ctx.task_executor().spawn_critical(
"exex manager blockchain tree notifications",
async move {
while let Ok(notification) = canon_state_notifications.recv().await {
handle.send_async(notification.into()).await.expect(
"blockchain tree notification could not be sent to exex manager",
);
}
},
);
info!(target: "reth::cli", "ExEx Manager started");
Some(exex_manager_handle)
} else {
None
};
// create pipeline // create pipeline
let network_client = node_adapter.network().fetch_client().await?; let network_client = node_adapter.network().fetch_client().await?;