diff --git a/Cargo.lock b/Cargo.lock index 0705e8aa8..ae1d61fa9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6199,6 +6199,7 @@ name = "reth-node-ethereum" version = "0.2.0-beta.4" dependencies = [ "eyre", + "futures", "reth-basic-payload-builder", "reth-db", "reth-ethereum-payload-builder", diff --git a/crates/node-builder/src/builder.rs b/crates/node-builder/src/builder.rs index ae90d7928..74a0564d6 100644 --- a/crates/node-builder/src/builder.rs +++ b/crates/node-builder/src/builder.rs @@ -7,13 +7,14 @@ use crate::{ ComponentsBuilder, FullNodeComponents, FullNodeComponentsAdapter, NodeComponents, NodeComponentsBuilder, PoolBuilder, }, + exex::{BoxedLaunchExEx, ExExContext}, hooks::NodeHooks, node::{FullNode, FullNodeTypes, FullNodeTypesAdapter}, rpc::{RethRpcServerHandles, RpcContext, RpcHooks}, Node, NodeHandle, }; use eyre::Context; -use futures::{future::Either, stream, stream_select, StreamExt}; +use futures::{future::Either, stream, stream_select, Future, StreamExt}; use rayon::ThreadPoolBuilder; use reth_beacon_consensus::{ hooks::{EngineHooks, PruneHook, StaticFileHook}, @@ -318,6 +319,7 @@ where components_builder, hooks: NodeHooks::new(), rpc: RpcHooks::new(), + exexs: Vec::new(), }, } } @@ -352,6 +354,7 @@ where components_builder: f(self.state.components_builder), hooks: self.state.hooks, rpc: self.state.rpc, + exexs: self.state.exexs, }, } } @@ -429,6 +432,26 @@ where self } + /// Installs an ExEx (Execution Extension) in the node. + pub fn install_exex(mut self, exex: F) -> Self + where + F: Fn( + ExExContext< + FullNodeComponentsAdapter< + FullNodeTypesAdapter>, + Components::Pool, + >, + >, + ) -> R + + Send + + 'static, + R: Future> + Send, + E: Future> + Send, + { + self.state.exexs.push(Box::new(exex)); + self + } + /// Launches the node and returns a handle to it. /// /// This bootstraps the node internals, creates all the components with the provider @@ -452,7 +475,7 @@ where let Self { config, - state: ComponentsState { types, components_builder, hooks, rpc }, + state: ComponentsState { types, components_builder, hooks, rpc, exexs: _ }, database, } = self; @@ -529,6 +552,8 @@ where let NodeComponents { transaction_pool, network, payload_builder } = components_builder.build_components(&ctx).await?; + // TODO(alexey): launch ExExs and consume their events + let BuilderContext { provider: blockchain_db, executor, @@ -1059,7 +1084,6 @@ where } /// Captures the necessary context for building the components of the node. -#[derive(Debug)] pub struct BuilderContext { /// The current head of the blockchain at launch. head: Head, @@ -1075,6 +1099,18 @@ pub struct BuilderContext { reth_config: reth_config::Config, } +impl std::fmt::Debug for BuilderContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BuilderContext") + .field("head", &self.head) + .field("provider", &std::any::type_name::()) + .field("executor", &self.executor) + .field("data_dir", &self.data_dir) + .field("config", &self.config) + .finish() + } +} + impl BuilderContext { /// Create a new instance of [BuilderContext] pub fn new( @@ -1217,7 +1253,6 @@ where /// /// Additionally, this state captures additional hooks that are called at specific points in the /// node's launch lifecycle. -#[derive(Debug)] pub struct ComponentsState { /// The types of the node. types: Types, @@ -1227,4 +1262,20 @@ pub struct ComponentsState { hooks: NodeHooks, /// Additional RPC hooks. rpc: RpcHooks, + /// The ExExs (execution extensions) of the node. + exexs: Vec>>, +} + +impl std::fmt::Debug + for ComponentsState +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ComponentsState") + .field("types", &std::any::type_name::()) + .field("components_builder", &std::any::type_name::()) + .field("hooks", &self.hooks) + .field("rpc", &self.rpc) + .field("exexs", &self.exexs.len()) + .finish() + } } diff --git a/crates/node-builder/src/components/builder.rs b/crates/node-builder/src/components/builder.rs index 57e4d8639..c60a02048 100644 --- a/crates/node-builder/src/components/builder.rs +++ b/crates/node-builder/src/components/builder.rs @@ -94,7 +94,7 @@ where where PB: PoolBuilder, { - let Self { payload_builder, network_builder, _marker, .. } = self; + let Self { pool_builder: _, payload_builder, network_builder, _marker } = self; ComponentsBuilder { pool_builder, payload_builder, network_builder, _marker } } } @@ -112,7 +112,7 @@ where where NB: NetworkBuilder, { - let Self { payload_builder, pool_builder, _marker, .. } = self; + let Self { pool_builder, payload_builder, network_builder: _, _marker } = self; ComponentsBuilder { pool_builder, payload_builder, network_builder, _marker } } @@ -124,7 +124,7 @@ where where PB: PayloadServiceBuilder, { - let Self { pool_builder, network_builder, _marker, .. } = self; + let Self { pool_builder, payload_builder: _, network_builder, _marker } = self; ComponentsBuilder { pool_builder, payload_builder, network_builder, _marker } } } diff --git a/crates/node-builder/src/exex.rs b/crates/node-builder/src/exex.rs new file mode 100644 index 000000000..1482be14f --- /dev/null +++ b/crates/node-builder/src/exex.rs @@ -0,0 +1,127 @@ +#![allow(dead_code)] +// todo: expand this (examples, assumptions, invariants) +//! Execution extensions (ExEx). +//! +//! An execution extension is a task that derives its state from Reth's state. +//! +//! Some examples of state such state derives are rollups, bridges, and indexers. +//! +//! An ExEx is a [`Future`] resolving to a `Result<()>` that is run indefinitely alongside Reth. +//! +//! ExEx's are initialized using an async closure that resolves to the ExEx; this closure gets +//! passed an [`ExExContext`] where it is possible to spawn additional tasks and modify Reth. +//! +//! Most ExEx's will want to derive their state from the [`CanonStateNotification`] channel given in +//! [`ExExContext`]. A new notification is emitted whenever blocks are executed in live and +//! historical sync. +//! +//! # Pruning +//! +//! ExEx's **SHOULD** emit an `ExExEvent::FinishedHeight` event to signify what blocks have been +//! processed. This event is used by Reth to determine what state can be pruned. +//! +//! An ExEx will not receive notifications for blocks less than the block emitted in the event. To +//! clarify: if the ExEx emits `ExExEvent::FinishedHeight(0)` it will receive notifications for any +//! `block_number >= 0`. +//! +//! [`Future`]: std::future::Future +//! [`ExExContext`]: crate::exex::ExExContext +//! [`CanonStateNotification`]: reth_provider::CanonStateNotification + +use crate::FullNodeTypes; +use futures::{future::BoxFuture, FutureExt}; +use reth_node_core::{ + dirs::{ChainPath, DataDirPath}, + node_config::NodeConfig, +}; +use reth_primitives::{BlockNumber, Head}; +use reth_tasks::TaskExecutor; +use std::future::Future; + +/// Events emitted by an ExEx. +#[derive(Debug)] +pub enum ExExEvent { + /// Highest block processed by the ExEx. + /// + /// The ExEx must guarantee that it will not require all earlier blocks in the future, meaning + /// that Reth is allowed to prune them. + /// + /// On reorgs, it's possible for the height to go down. + FinishedHeight(BlockNumber), +} + +/// Captures the context that an ExEx has access to. +#[derive(Clone, Debug)] +pub struct ExExContext { + /// The current head of the blockchain at launch. + pub head: Head, + /// The configured provider to interact with the blockchain. + pub provider: Node::Provider, + /// The task executor of the node. + pub task_executor: TaskExecutor, + /// The data dir of the node. + pub data_dir: ChainPath, + /// The config of the node + pub config: NodeConfig, + /// The loaded node config + pub reth_config: reth_config::Config, + // TODO(alexey): add pool, payload builder, anything else? +} + +/// A trait for launching an ExEx. +trait LaunchExEx: Send { + /// Launches the ExEx. + /// + /// The ExEx should be able to run independently and emit events on the channels provided in + /// the [`ExExContext`]. + fn launch( + self, + ctx: ExExContext, + ) -> impl Future> + Send>> + Send; +} + +type BoxExEx = BoxFuture<'static, eyre::Result<()>>; + +/// A version of [LaunchExEx] that returns a boxed future. Makes the trait object-safe. +pub(crate) trait BoxedLaunchExEx: Send { + fn launch(self: Box, ctx: ExExContext) + -> BoxFuture<'static, eyre::Result>; +} + +/// Implements [BoxedLaunchExEx] for any [LaunchExEx] that is [Send] and `'static`. +/// +/// Returns a [BoxFuture] that resolves to a [BoxExEx]. +impl BoxedLaunchExEx for E +where + E: LaunchExEx + Send + 'static, + Node: FullNodeTypes, +{ + fn launch( + self: Box, + ctx: ExExContext, + ) -> BoxFuture<'static, eyre::Result> { + async move { + let exex = LaunchExEx::launch(*self, ctx).await?; + Ok(Box::pin(exex) as BoxExEx) + } + .boxed() + } +} + +/// Implements `LaunchExEx` for any closure that takes an [ExExContext] and returns a future +/// resolving to an ExEx. +impl LaunchExEx for F +where + Node: FullNodeTypes, + F: FnOnce(ExExContext) -> Fut + Send, + Fut: Future> + Send, + E: Future> + Send, +{ + fn launch( + self, + ctx: ExExContext, + ) -> impl Future> + Send>> + Send + { + self(ctx) + } +} diff --git a/crates/node-builder/src/lib.rs b/crates/node-builder/src/lib.rs index 9323fbd32..757e37f04 100644 --- a/crates/node-builder/src/lib.rs +++ b/crates/node-builder/src/lib.rs @@ -27,6 +27,9 @@ pub use handle::NodeHandle; pub mod provider; pub mod rpc; +/// Support for installing the ExExs (execution extensions) in a node. +pub mod exex; + /// Re-export the core configuration traits. pub use reth_node_core::cli::config::{ PayloadBuilderConfig, RethNetworkConfig, RethRpcConfig, RethTransactionPoolConfig, diff --git a/crates/node-ethereum/Cargo.toml b/crates/node-ethereum/Cargo.toml index 91a59cd72..12c3ab5ea 100644 --- a/crates/node-ethereum/Cargo.toml +++ b/crates/node-ethereum/Cargo.toml @@ -29,4 +29,6 @@ eyre.workspace = true serde.workspace = true [dev-dependencies] -reth-db.workspace = true \ No newline at end of file +reth-db.workspace = true +futures.workspace = true + diff --git a/crates/node-ethereum/tests/it/exex.rs b/crates/node-ethereum/tests/it/exex.rs new file mode 100644 index 000000000..bf21bd549 --- /dev/null +++ b/crates/node-ethereum/tests/it/exex.rs @@ -0,0 +1,33 @@ +use futures::future; +use reth_db::test_utils::create_test_rw_db; +use reth_node_builder::{exex::ExExContext, FullNodeTypes, NodeBuilder, NodeConfig}; +use reth_node_ethereum::EthereumNode; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +struct DummyExEx { + _ctx: ExExContext, +} + +impl Future for DummyExEx { + type Output = eyre::Result<()>; + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + Poll::Pending + } +} + +#[test] +fn basic_exex() { + let config = NodeConfig::test(); + let db = create_test_rw_db(); + let _builder = NodeBuilder::new(config) + .with_database(db) + .with_types(EthereumNode::default()) + .with_components(EthereumNode::components()) + .install_exex(move |ctx| future::ok(DummyExEx { _ctx: ctx })) + .check_launch(); +} diff --git a/crates/node-ethereum/tests/it/main.rs b/crates/node-ethereum/tests/it/main.rs index 34ac15ef9..5fc321fe3 100644 --- a/crates/node-ethereum/tests/it/main.rs +++ b/crates/node-ethereum/tests/it/main.rs @@ -1,3 +1,4 @@ mod builder; +mod exex; fn main() {}