feat(node-builder): ExEx (Execution Extensions) installation (#7235)

Co-authored-by: Oliver Nordbjerg <onbjerg@users.noreply.github.com>
Co-authored-by: Oliver Nordbjerg <hi@notbjerg.me>
This commit is contained in:
Alexey Shekhirin
2024-03-28 02:39:57 +01:00
committed by GitHub
parent 8d488f5982
commit 83cd848639
8 changed files with 226 additions and 8 deletions

1
Cargo.lock generated
View File

@ -6199,6 +6199,7 @@ name = "reth-node-ethereum"
version = "0.2.0-beta.4" version = "0.2.0-beta.4"
dependencies = [ dependencies = [
"eyre", "eyre",
"futures",
"reth-basic-payload-builder", "reth-basic-payload-builder",
"reth-db", "reth-db",
"reth-ethereum-payload-builder", "reth-ethereum-payload-builder",

View File

@ -7,13 +7,14 @@ use crate::{
ComponentsBuilder, FullNodeComponents, FullNodeComponentsAdapter, NodeComponents, ComponentsBuilder, FullNodeComponents, FullNodeComponentsAdapter, NodeComponents,
NodeComponentsBuilder, PoolBuilder, NodeComponentsBuilder, PoolBuilder,
}, },
exex::{BoxedLaunchExEx, ExExContext},
hooks::NodeHooks, hooks::NodeHooks,
node::{FullNode, FullNodeTypes, FullNodeTypesAdapter}, node::{FullNode, FullNodeTypes, FullNodeTypesAdapter},
rpc::{RethRpcServerHandles, RpcContext, RpcHooks}, rpc::{RethRpcServerHandles, RpcContext, RpcHooks},
Node, NodeHandle, Node, NodeHandle,
}; };
use eyre::Context; use eyre::Context;
use futures::{future::Either, stream, stream_select, StreamExt}; use futures::{future::Either, stream, stream_select, Future, StreamExt};
use rayon::ThreadPoolBuilder; use rayon::ThreadPoolBuilder;
use reth_beacon_consensus::{ use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook, StaticFileHook}, hooks::{EngineHooks, PruneHook, StaticFileHook},
@ -318,6 +319,7 @@ where
components_builder, components_builder,
hooks: NodeHooks::new(), hooks: NodeHooks::new(),
rpc: RpcHooks::new(), rpc: RpcHooks::new(),
exexs: Vec::new(),
}, },
} }
} }
@ -352,6 +354,7 @@ where
components_builder: f(self.state.components_builder), components_builder: f(self.state.components_builder),
hooks: self.state.hooks, hooks: self.state.hooks,
rpc: self.state.rpc, rpc: self.state.rpc,
exexs: self.state.exexs,
}, },
} }
} }
@ -429,6 +432,26 @@ where
self self
} }
/// Installs an ExEx (Execution Extension) in the node.
pub fn install_exex<F, R, E>(mut self, exex: F) -> Self
where
F: Fn(
ExExContext<
FullNodeComponentsAdapter<
FullNodeTypesAdapter<Types, DB, RethFullProviderType<DB, Types::Evm>>,
Components::Pool,
>,
>,
) -> R
+ Send
+ 'static,
R: Future<Output = eyre::Result<E>> + Send,
E: Future<Output = eyre::Result<()>> + Send,
{
self.state.exexs.push(Box::new(exex));
self
}
/// Launches the node and returns a handle to it. /// Launches the node and returns a handle to it.
/// ///
/// This bootstraps the node internals, creates all the components with the provider /// This bootstraps the node internals, creates all the components with the provider
@ -452,7 +475,7 @@ where
let Self { let Self {
config, config,
state: ComponentsState { types, components_builder, hooks, rpc }, state: ComponentsState { types, components_builder, hooks, rpc, exexs: _ },
database, database,
} = self; } = self;
@ -529,6 +552,8 @@ where
let NodeComponents { transaction_pool, network, payload_builder } = let NodeComponents { transaction_pool, network, payload_builder } =
components_builder.build_components(&ctx).await?; components_builder.build_components(&ctx).await?;
// TODO(alexey): launch ExExs and consume their events
let BuilderContext { let BuilderContext {
provider: blockchain_db, provider: blockchain_db,
executor, executor,
@ -1059,7 +1084,6 @@ where
} }
/// Captures the necessary context for building the components of the node. /// Captures the necessary context for building the components of the node.
#[derive(Debug)]
pub struct BuilderContext<Node: FullNodeTypes> { pub struct BuilderContext<Node: FullNodeTypes> {
/// The current head of the blockchain at launch. /// The current head of the blockchain at launch.
head: Head, head: Head,
@ -1075,6 +1099,18 @@ pub struct BuilderContext<Node: FullNodeTypes> {
reth_config: reth_config::Config, reth_config: reth_config::Config,
} }
impl<Node: FullNodeTypes> std::fmt::Debug for BuilderContext<Node> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BuilderContext")
.field("head", &self.head)
.field("provider", &std::any::type_name::<Node::Provider>())
.field("executor", &self.executor)
.field("data_dir", &self.data_dir)
.field("config", &self.config)
.finish()
}
}
impl<Node: FullNodeTypes> BuilderContext<Node> { impl<Node: FullNodeTypes> BuilderContext<Node> {
/// Create a new instance of [BuilderContext] /// Create a new instance of [BuilderContext]
pub fn new( pub fn new(
@ -1217,7 +1253,6 @@ where
/// ///
/// Additionally, this state captures additional hooks that are called at specific points in the /// Additionally, this state captures additional hooks that are called at specific points in the
/// node's launch lifecycle. /// node's launch lifecycle.
#[derive(Debug)]
pub struct ComponentsState<Types, Components, FullNode: FullNodeComponents> { pub struct ComponentsState<Types, Components, FullNode: FullNodeComponents> {
/// The types of the node. /// The types of the node.
types: Types, types: Types,
@ -1227,4 +1262,20 @@ pub struct ComponentsState<Types, Components, FullNode: FullNodeComponents> {
hooks: NodeHooks<FullNode>, hooks: NodeHooks<FullNode>,
/// Additional RPC hooks. /// Additional RPC hooks.
rpc: RpcHooks<FullNode>, rpc: RpcHooks<FullNode>,
/// The ExExs (execution extensions) of the node.
exexs: Vec<Box<dyn BoxedLaunchExEx<FullNode>>>,
}
impl<Types, Components, FullNode: FullNodeComponents> std::fmt::Debug
for ComponentsState<Types, Components, FullNode>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ComponentsState")
.field("types", &std::any::type_name::<Types>())
.field("components_builder", &std::any::type_name::<Components>())
.field("hooks", &self.hooks)
.field("rpc", &self.rpc)
.field("exexs", &self.exexs.len())
.finish()
}
} }

View File

@ -94,7 +94,7 @@ where
where where
PB: PoolBuilder<Node>, PB: PoolBuilder<Node>,
{ {
let Self { payload_builder, network_builder, _marker, .. } = self; let Self { pool_builder: _, payload_builder, network_builder, _marker } = self;
ComponentsBuilder { pool_builder, payload_builder, network_builder, _marker } ComponentsBuilder { pool_builder, payload_builder, network_builder, _marker }
} }
} }
@ -112,7 +112,7 @@ where
where where
NB: NetworkBuilder<Node, PoolB::Pool>, NB: NetworkBuilder<Node, PoolB::Pool>,
{ {
let Self { payload_builder, pool_builder, _marker, .. } = self; let Self { pool_builder, payload_builder, network_builder: _, _marker } = self;
ComponentsBuilder { pool_builder, payload_builder, network_builder, _marker } ComponentsBuilder { pool_builder, payload_builder, network_builder, _marker }
} }
@ -124,7 +124,7 @@ where
where where
PB: PayloadServiceBuilder<Node, PoolB::Pool>, PB: PayloadServiceBuilder<Node, PoolB::Pool>,
{ {
let Self { pool_builder, network_builder, _marker, .. } = self; let Self { pool_builder, payload_builder: _, network_builder, _marker } = self;
ComponentsBuilder { pool_builder, payload_builder, network_builder, _marker } ComponentsBuilder { pool_builder, payload_builder, network_builder, _marker }
} }
} }

View File

@ -0,0 +1,127 @@
#![allow(dead_code)]
// todo: expand this (examples, assumptions, invariants)
//! Execution extensions (ExEx).
//!
//! An execution extension is a task that derives its state from Reth's state.
//!
//! Some examples of state such state derives are rollups, bridges, and indexers.
//!
//! An ExEx is a [`Future`] resolving to a `Result<()>` that is run indefinitely alongside Reth.
//!
//! ExEx's are initialized using an async closure that resolves to the ExEx; this closure gets
//! passed an [`ExExContext`] where it is possible to spawn additional tasks and modify Reth.
//!
//! Most ExEx's will want to derive their state from the [`CanonStateNotification`] channel given in
//! [`ExExContext`]. A new notification is emitted whenever blocks are executed in live and
//! historical sync.
//!
//! # Pruning
//!
//! ExEx's **SHOULD** emit an `ExExEvent::FinishedHeight` event to signify what blocks have been
//! processed. This event is used by Reth to determine what state can be pruned.
//!
//! An ExEx will not receive notifications for blocks less than the block emitted in the event. To
//! clarify: if the ExEx emits `ExExEvent::FinishedHeight(0)` it will receive notifications for any
//! `block_number >= 0`.
//!
//! [`Future`]: std::future::Future
//! [`ExExContext`]: crate::exex::ExExContext
//! [`CanonStateNotification`]: reth_provider::CanonStateNotification
use crate::FullNodeTypes;
use futures::{future::BoxFuture, FutureExt};
use reth_node_core::{
dirs::{ChainPath, DataDirPath},
node_config::NodeConfig,
};
use reth_primitives::{BlockNumber, Head};
use reth_tasks::TaskExecutor;
use std::future::Future;
/// Events emitted by an ExEx.
#[derive(Debug)]
pub enum ExExEvent {
/// Highest block processed by the ExEx.
///
/// The ExEx must guarantee that it will not require all earlier blocks in the future, meaning
/// that Reth is allowed to prune them.
///
/// On reorgs, it's possible for the height to go down.
FinishedHeight(BlockNumber),
}
/// Captures the context that an ExEx has access to.
#[derive(Clone, Debug)]
pub struct ExExContext<Node: FullNodeTypes> {
/// The current head of the blockchain at launch.
pub head: Head,
/// The configured provider to interact with the blockchain.
pub provider: Node::Provider,
/// The task executor of the node.
pub task_executor: TaskExecutor,
/// The data dir of the node.
pub data_dir: ChainPath<DataDirPath>,
/// The config of the node
pub config: NodeConfig,
/// The loaded node config
pub reth_config: reth_config::Config,
// TODO(alexey): add pool, payload builder, anything else?
}
/// A trait for launching an ExEx.
trait LaunchExEx<Node: FullNodeTypes>: Send {
/// Launches the ExEx.
///
/// The ExEx should be able to run independently and emit events on the channels provided in
/// the [`ExExContext`].
fn launch(
self,
ctx: ExExContext<Node>,
) -> impl Future<Output = eyre::Result<impl Future<Output = eyre::Result<()>> + Send>> + Send;
}
type BoxExEx = BoxFuture<'static, eyre::Result<()>>;
/// A version of [LaunchExEx] that returns a boxed future. Makes the trait object-safe.
pub(crate) trait BoxedLaunchExEx<Node: FullNodeTypes>: Send {
fn launch(self: Box<Self>, ctx: ExExContext<Node>)
-> BoxFuture<'static, eyre::Result<BoxExEx>>;
}
/// Implements [BoxedLaunchExEx] for any [LaunchExEx] that is [Send] and `'static`.
///
/// Returns a [BoxFuture] that resolves to a [BoxExEx].
impl<E, Node> BoxedLaunchExEx<Node> for E
where
E: LaunchExEx<Node> + Send + 'static,
Node: FullNodeTypes,
{
fn launch(
self: Box<Self>,
ctx: ExExContext<Node>,
) -> BoxFuture<'static, eyre::Result<BoxExEx>> {
async move {
let exex = LaunchExEx::launch(*self, ctx).await?;
Ok(Box::pin(exex) as BoxExEx)
}
.boxed()
}
}
/// Implements `LaunchExEx` for any closure that takes an [ExExContext] and returns a future
/// resolving to an ExEx.
impl<Node, F, Fut, E> LaunchExEx<Node> for F
where
Node: FullNodeTypes,
F: FnOnce(ExExContext<Node>) -> Fut + Send,
Fut: Future<Output = eyre::Result<E>> + Send,
E: Future<Output = eyre::Result<()>> + Send,
{
fn launch(
self,
ctx: ExExContext<Node>,
) -> impl Future<Output = eyre::Result<impl Future<Output = eyre::Result<()>> + Send>> + Send
{
self(ctx)
}
}

View File

@ -27,6 +27,9 @@ pub use handle::NodeHandle;
pub mod provider; pub mod provider;
pub mod rpc; pub mod rpc;
/// Support for installing the ExExs (execution extensions) in a node.
pub mod exex;
/// Re-export the core configuration traits. /// Re-export the core configuration traits.
pub use reth_node_core::cli::config::{ pub use reth_node_core::cli::config::{
PayloadBuilderConfig, RethNetworkConfig, RethRpcConfig, RethTransactionPoolConfig, PayloadBuilderConfig, RethNetworkConfig, RethRpcConfig, RethTransactionPoolConfig,

View File

@ -30,3 +30,5 @@ serde.workspace = true
[dev-dependencies] [dev-dependencies]
reth-db.workspace = true reth-db.workspace = true
futures.workspace = true

View File

@ -0,0 +1,33 @@
use futures::future;
use reth_db::test_utils::create_test_rw_db;
use reth_node_builder::{exex::ExExContext, FullNodeTypes, NodeBuilder, NodeConfig};
use reth_node_ethereum::EthereumNode;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
struct DummyExEx<Node: FullNodeTypes> {
_ctx: ExExContext<Node>,
}
impl<Node: FullNodeTypes> Future for DummyExEx<Node> {
type Output = eyre::Result<()>;
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
#[test]
fn basic_exex() {
let config = NodeConfig::test();
let db = create_test_rw_db();
let _builder = NodeBuilder::new(config)
.with_database(db)
.with_types(EthereumNode::default())
.with_components(EthereumNode::components())
.install_exex(move |ctx| future::ok(DummyExEx { _ctx: ctx }))
.check_launch();
}

View File

@ -1,3 +1,4 @@
mod builder; mod builder;
mod exex;
fn main() {} fn main() {}