diff --git a/Cargo.lock b/Cargo.lock index afd37d295..3a1620059 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6488,13 +6488,20 @@ dependencies = [ name = "reth-exex" version = "0.2.0-beta.5" dependencies = [ + "eyre", + "futures", + "metrics", "reth-config", + "reth-metrics", "reth-node-api", "reth-node-core", "reth-primitives", "reth-provider", "reth-tasks", + "reth-tracing", "tokio", + "tokio-stream", + "tokio-util", ] [[package]] diff --git a/crates/exex/Cargo.toml b/crates/exex/Cargo.toml index 91fd04ae9..d501a906e 100644 --- a/crates/exex/Cargo.toml +++ b/crates/exex/Cargo.toml @@ -12,10 +12,22 @@ description = "Execution extensions for Reth" workspace = true [dependencies] +## reth reth-config.workspace = true +reth-metrics.workspace = true reth-node-api.workspace = true reth-node-core.workspace = true reth-primitives.workspace = true reth-provider.workspace = true reth-tasks.workspace = true +reth-tracing.workspace = true + +## async +futures.workspace = true tokio.workspace = true +tokio-stream.workspace = true +tokio-util.workspace = true + +## misc +eyre.workspace = true +metrics.workspace = true diff --git a/crates/exex/src/lib.rs b/crates/exex/src/lib.rs index 411e223af..638d8af79 100644 --- a/crates/exex/src/lib.rs +++ b/crates/exex/src/lib.rs @@ -1,6 +1,31 @@ -//! Execution extensions. +// todo: expand this (examples, assumptions, invariants) +//! Execution extensions (ExEx). //! -//! TBD +//! An execution extension is a task that derives its state from Reth's state. +//! +//! Some examples of such state derives are rollups, bridges, and indexers. +//! +//! An ExEx is a [`Future`] resolving to a `Result<()>` that is run indefinitely alongside Reth. +//! +//! ExEx's are initialized using an async closure that resolves to the ExEx; this closure gets +//! passed an [`ExExContext`] where it is possible to spawn additional tasks and modify Reth. +//! +//! Most ExEx's will want to derive their state from the [`CanonStateNotification`] channel given in +//! [`ExExContext`]. A new notification is emitted whenever blocks are executed in live and +//! historical sync. +//! +//! # Pruning +//! +//! ExEx's **SHOULD** emit an `ExExEvent::FinishedHeight` event to signify what blocks have been +//! processed. This event is used by Reth to determine what state can be pruned. +//! +//! An ExEx will only receive notifications for blocks greater than the block emitted in the event. +//! To clarify: if the ExEx emits `ExExEvent::FinishedHeight(0)` it will receive notifications for +//! any `block_number > 0`. +//! +//! [`Future`]: std::future::Future +//! [`ExExContext`]: crate::ExExContext +//! [`CanonStateNotification`]: reth_provider::CanonStateNotification #![doc( html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", @@ -14,3 +39,6 @@ pub use context::*; mod event; pub use event::*; + +mod manager; +pub use manager::*; diff --git a/crates/exex/src/manager.rs b/crates/exex/src/manager.rs new file mode 100644 index 000000000..7e202b2d7 --- /dev/null +++ b/crates/exex/src/manager.rs @@ -0,0 +1,466 @@ +use std::{ + collections::VecDeque, + future::{poll_fn, Future}, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + task::{Context, Poll}, +}; + +use crate::ExExEvent; +use futures::StreamExt; +use metrics::Gauge; +use reth_metrics::{metrics::Counter, Metrics}; +use reth_primitives::BlockNumber; +use reth_provider::CanonStateNotification; +use reth_tracing::tracing::debug; +use tokio::sync::{ + mpsc::{self, error::SendError, Receiver, UnboundedReceiver, UnboundedSender}, + watch, +}; +use tokio_stream::wrappers::WatchStream; +use tokio_util::sync::{PollSendError, PollSender}; + +/// Metrics for an ExEx. +#[derive(Metrics)] +#[metrics(scope = "exex")] +struct ExExMetrics { + /// The total number of canonical state notifications sent to an ExEx. + notifications_sent_total: Counter, + /// The total number of events an ExEx has sent to the manager. + events_sent_total: Counter, +} + +/// A handle to an ExEx used by the [`ExExManager`] to communicate with ExEx's. +/// +/// A handle should be created for each ExEx with a unique ID. The channels returned by +/// [`ExExHandle::new`] should be given to the ExEx, while the handle itself should be given to the +/// manager in [`ExExManager::new`]. +#[derive(Debug)] +pub struct ExExHandle { + /// The execution extension's ID. + id: String, + /// Metrics for an ExEx. + metrics: ExExMetrics, + + /// Channel to send [`CanonStateNotification`]s to the ExEx. + sender: PollSender, + /// Channel to receive [`ExExEvent`]s from the ExEx. + receiver: UnboundedReceiver, + /// The ID of the next notification to send to this ExEx. + next_notification_id: usize, + + /// The finished block number of the ExEx. + /// + /// If this is `None`, the ExEx has not emitted a `FinishedHeight` event. + finished_height: Option, +} + +impl ExExHandle { + /// Create a new handle for the given ExEx. + /// + /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a + /// [`Receiver`] for [`CanonStateNotification`]s that should be given to the ExEx. + pub fn new(id: String) -> (Self, UnboundedSender, Receiver) { + let (canon_tx, canon_rx) = mpsc::channel(1); + let (event_tx, event_rx) = mpsc::unbounded_channel(); + + ( + Self { + id: id.clone(), + metrics: ExExMetrics::new_with_labels(&[("exex", id)]), + sender: PollSender::new(canon_tx), + receiver: event_rx, + next_notification_id: 0, + finished_height: None, + }, + event_tx, + canon_rx, + ) + } + + /// Reserves a slot in the `PollSender` channel and sends the notification if the slot was + /// successfully reserved. + /// + /// When the notification is sent, it is considered delivered. + fn send( + &mut self, + cx: &mut Context<'_>, + (event_id, notification): &(usize, CanonStateNotification), + ) -> Poll>> { + // check that this notification is above the finished height of the exex if the exex has set + // one + if let Some(finished_height) = self.finished_height { + if finished_height >= notification.tip().number { + self.next_notification_id = event_id + 1; + return Poll::Ready(Ok(())) + } + } + + match self.sender.poll_reserve(cx) { + Poll::Ready(Ok(())) => (), + other => return other, + } + + match self.sender.send_item(notification.clone()) { + Ok(()) => { + self.next_notification_id = event_id + 1; + self.metrics.notifications_sent_total.increment(1); + Poll::Ready(Ok(())) + } + Err(err) => Poll::Ready(Err(err)), + } + } +} + +/// Metrics for the ExEx manager. +#[derive(Metrics)] +#[metrics(scope = "exex_manager")] +pub struct ExExManagerMetrics { + /// Max size of the internal state notifications buffer. + max_capacity: Gauge, + /// Current capacity of the internal state notifications buffer. + current_capacity: Gauge, + /// Current size of the internal state notifications buffer. + /// + /// Note that this might be slightly bigger than the maximum capacity in some cases. + buffer_size: Gauge, +} + +/// The execution extension manager. +/// +/// The manager is responsible for: +/// +/// - Receiving relevant events from the rest of the node, and sending these to the execution +/// extensions +/// - Backpressure +/// - Error handling +/// - Monitoring +#[derive(Debug)] +pub struct ExExManager { + /// Handles to communicate with the ExEx's. + exex_handles: Vec, + + /// [`CanonStateNotification`] channel from the [`ExExManagerHandle`]s. + handle_rx: UnboundedReceiver, + + /// The minimum notification ID currently present in the buffer. + min_id: usize, + /// Monotonically increasing ID for [`CanonStateNotification`]s. + next_id: usize, + /// Internal buffer of [`CanonStateNotification`]s. + /// + /// The first element of the tuple is a monotonically increasing ID unique to the notification + /// (the second element of the tuple). + buffer: VecDeque<(usize, CanonStateNotification)>, + /// Max size of the internal state notifications buffer. + max_capacity: usize, + /// Current state notifications buffer capacity. + /// + /// Used to inform the execution stage of possible batch sizes. + current_capacity: Arc, + + /// Whether the manager is ready to receive new notifications. + is_ready: watch::Sender, + + /// The finished height of all ExEx's. + /// + /// This is the lowest common denominator between all ExEx's. If an ExEx has not emitted a + /// `FinishedHeight` event, it will be `None`. + /// + /// This block is used to (amongst other things) determine what blocks are safe to prune. + /// + /// The number is inclusive, i.e. all blocks `<= finished_height` are safe to prune. + finished_height: watch::Sender>, + + /// A handle to the ExEx manager. + handle: ExExManagerHandle, + /// Metrics for the ExEx manager. + metrics: ExExManagerMetrics, +} + +impl ExExManager { + /// Create a new [`ExExManager`]. + /// + /// You must provide an [`ExExHandle`] for each ExEx and the maximum capacity of the + /// notification buffer in the manager. + /// + /// When the capacity is exceeded (which can happen if an ExEx is slow) no one can send + /// notifications over [`ExExManagerHandle`]s until there is capacity again. + pub fn new(handles: Vec, max_capacity: usize) -> Self { + let num_exexs = handles.len(); + + let (handle_tx, handle_rx) = mpsc::unbounded_channel(); + let (is_ready_tx, is_ready_rx) = watch::channel(true); + let (finished_height_tx, finished_height_rx) = watch::channel(None); + + let current_capacity = Arc::new(AtomicUsize::new(max_capacity)); + + let metrics = ExExManagerMetrics::default(); + metrics.max_capacity.set(max_capacity as f64); + + Self { + exex_handles: handles, + + handle_rx, + + min_id: 0, + next_id: 0, + buffer: VecDeque::with_capacity(max_capacity), + max_capacity, + current_capacity: Arc::clone(¤t_capacity), + + is_ready: is_ready_tx, + finished_height: finished_height_tx, + + handle: ExExManagerHandle { + exex_tx: handle_tx, + num_exexs, + is_ready_receiver: is_ready_rx.clone(), + is_ready: WatchStream::new(is_ready_rx), + current_capacity, + finished_height: finished_height_rx, + }, + metrics, + } + } + + /// Returns the handle to the manager. + pub fn handle(&self) -> ExExManagerHandle { + self.handle.clone() + } + + /// Updates the current buffer capacity and notifies all `is_ready` watchers of the manager's + /// readiness to receive notifications. + fn update_capacity(&mut self) { + let capacity = self.max_capacity.saturating_sub(self.buffer.len()); + self.current_capacity.store(capacity, Ordering::Relaxed); + self.metrics.current_capacity.set(capacity as f64); + self.metrics.buffer_size.set(self.buffer.len() as f64); + + // we can safely ignore if the channel is closed, since the manager always holds it open + // internally + let _ = self.is_ready.send(capacity > 0); + } + + /// Pushes a new notification into the managers internal buffer, assigning the notification a + /// unique ID. + fn push_notification(&mut self, notification: CanonStateNotification) { + let next_id = self.next_id; + self.buffer.push_back((next_id, notification)); + self.next_id += 1; + } +} + +impl Future for ExExManager { + type Output = eyre::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // drain handle notifications + while self.buffer.len() < self.max_capacity { + if let Poll::Ready(Some(notification)) = self.handle_rx.poll_recv(cx) { + debug!("received new notification"); + self.push_notification(notification); + continue + } + break + } + + // update capacity + self.update_capacity(); + + // advance all poll senders + let mut min_id = usize::MAX; + for idx in (0..self.exex_handles.len()).rev() { + let mut exex = self.exex_handles.swap_remove(idx); + + // it is a logic error for this to ever underflow since the manager manages the + // notification IDs + let notification_id = exex + .next_notification_id + .checked_sub(self.min_id) + .expect("exex expected notification ID outside the manager's range"); + if let Some(notification) = self.buffer.get(notification_id) { + debug!(exex.id, notification_id, "sent notification to exex"); + if let Poll::Ready(Err(err)) = exex.send(cx, notification) { + // the channel was closed, which is irrecoverable for the manager + return Poll::Ready(Err(err.into())) + } + } + min_id = min_id.min(exex.next_notification_id); + self.exex_handles.push(exex); + } + + // remove processed buffered notifications + self.buffer.retain(|&(id, _)| id >= min_id); + self.min_id = min_id; + debug!(min_id, "lowest notification id in buffer updated"); + + // update capacity + self.update_capacity(); + + // handle incoming exex events + for exex in self.exex_handles.iter_mut() { + while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) { + debug!(?event, id = exex.id, "received event from exex"); + exex.metrics.events_sent_total.increment(1); + match event { + ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height), + } + } + } + + // update watch channel block number + let finished_height = self.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| { + let height = match exex.finished_height { + None => return Err(()), + Some(height) => height, + }; + + if height < curr { + Ok(height) + } else { + Ok(curr) + } + }); + if let Ok(finished_height) = finished_height { + let _ = self.finished_height.send(Some(finished_height)); + } + + Poll::Pending + } +} + +/// A handle to communicate with the [`ExExManager`]. +#[derive(Debug)] +pub struct ExExManagerHandle { + /// Channel to send notifications to the ExEx manager. + exex_tx: UnboundedSender, + /// The number of ExEx's running on the node. + num_exexs: usize, + /// A watch channel denoting whether the manager is ready for new notifications or not. + /// + /// This is stored internally alongside a `WatchStream` representation of the same value. This + /// field is only used to create a new `WatchStream` when the handle is cloned, but is + /// otherwise unused. + is_ready_receiver: watch::Receiver, + /// A stream of bools denoting whether the manager is ready for new notifications. + is_ready: WatchStream, + /// The current capacity of the manager's internal notification buffer. + current_capacity: Arc, + /// The finished height of all ExEx's. + /// + /// This is the lowest common denominator between all ExEx's. If an ExEx has not emitted a + /// `FinishedHeight` event, it will be `None`. + /// + /// This block is used to (amongst other things) determine what blocks are safe to prune. + /// + /// The number is inclusive, i.e. all blocks `<= finished_height` are safe to prune. + finished_height: watch::Receiver>, +} + +impl ExExManagerHandle { + /// Synchronously send a notification over the channel to all execution extensions. + /// + /// Senders should call [`Self::has_capacity`] first. + pub fn send( + &self, + notification: CanonStateNotification, + ) -> Result<(), SendError> { + self.exex_tx.send(notification) + } + + /// Asynchronously send a notification over the channel to all execution extensions. + /// + /// The returned future resolves when the notification has been delivered. If there is no + /// capacity in the channel, the future will wait. + pub async fn send_async( + &mut self, + notification: CanonStateNotification, + ) -> Result<(), SendError> { + self.ready().await; + self.exex_tx.send(notification) + } + + /// Get the current capacity of the ExEx manager's internal notification buffer. + pub fn capacity(&self) -> usize { + self.current_capacity.load(Ordering::Relaxed) + } + + /// Whether there is capacity in the ExEx manager's internal notification buffer. + /// + /// If this returns `false`, the owner of the handle should **NOT** send new notifications over + /// the channel until the manager is ready again, as this can lead to unbounded memory growth. + pub fn has_capacity(&self) -> bool { + self.current_capacity.load(Ordering::Relaxed) > 0 + } + + /// Returns `true` if there are ExEx's installed in the node. + pub fn has_exexs(&self) -> bool { + self.num_exexs > 0 + } + + /// The finished height of all ExEx's. + /// + /// This is the lowest common denominator between all ExEx's. If an ExEx has not emitted a + /// `FinishedHeight` event, it will be `None`. + /// + /// This block is used to (amongst other things) determine what blocks are safe to prune. + /// + /// The number is inclusive, i.e. all blocks `<= finished_height` are safe to prune. + pub fn finished_height(&mut self) -> Option { + *self.finished_height.borrow_and_update() + } + + /// Wait until the manager is ready for new notifications. + pub async fn ready(&mut self) { + poll_fn(|cx| self.poll_ready(cx)).await + } + + /// Wait until the manager is ready for new notifications. + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> { + // if this returns `Poll::Ready(None)` the stream is exhausted, which means the underlying + // channel is closed. + // + // this can only happen if the manager died, and the node is shutting down, so we ignore it + let mut pinned = std::pin::pin!(&mut self.is_ready); + if pinned.poll_next_unpin(cx) == Poll::Ready(Some(true)) { + Poll::Ready(()) + } else { + Poll::Pending + } + } +} + +impl Clone for ExExManagerHandle { + fn clone(&self) -> Self { + Self { + exex_tx: self.exex_tx.clone(), + num_exexs: self.num_exexs, + is_ready_receiver: self.is_ready_receiver.clone(), + is_ready: WatchStream::new(self.is_ready_receiver.clone()), + current_capacity: self.current_capacity.clone(), + finished_height: self.finished_height.clone(), + } + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn delivers_events() {} + + #[tokio::test] + async fn capacity() {} + + #[tokio::test] + async fn updates_block_height() {} + + #[tokio::test] + async fn slow_exex() {} + + #[tokio::test] + async fn is_ready() {} +} diff --git a/crates/node-builder/Cargo.toml b/crates/node-builder/Cargo.toml index 4996878fc..693a20ac0 100644 --- a/crates/node-builder/Cargo.toml +++ b/crates/node-builder/Cargo.toml @@ -36,7 +36,6 @@ reth-prune.workspace = true reth-stages.workspace = true reth-config.workspace = true - ## async futures.workspace = true tokio = { workspace = true, features = [ diff --git a/crates/node-builder/src/builder.rs b/crates/node-builder/src/builder.rs index 919426856..00e62bc30 100644 --- a/crates/node-builder/src/builder.rs +++ b/crates/node-builder/src/builder.rs @@ -14,7 +14,7 @@ use crate::{ Node, NodeHandle, }; use eyre::Context; -use futures::{future::Either, stream, stream_select, Future, StreamExt}; +use futures::{future, future::Either, stream, stream_select, Future, StreamExt}; use rayon::ThreadPoolBuilder; use reth_beacon_consensus::{ hooks::{EngineHooks, PruneHook, StaticFileHook}, @@ -28,7 +28,7 @@ use reth_db::{ test_utils::{create_test_rw_db, TempDatabase}, DatabaseEnv, }; -use reth_exex::ExExContext; +use reth_exex::{ExExContext, ExExHandle, ExExManager}; use reth_interfaces::p2p::either::EitherDownloader; use reth_network::{NetworkBuilder, NetworkConfig, NetworkEvents, NetworkHandle}; use reth_node_api::{FullNodeTypes, FullNodeTypesAdapter, NodeTypes}; @@ -44,7 +44,9 @@ use reth_node_core::{ utils::write_peers_to_file, }; use reth_primitives::{constants::eip4844::MAINNET_KZG_TRUSTED_SETUP, format_ether, ChainSpec}; -use reth_provider::{providers::BlockchainProvider, ChainSpecProvider, ProviderFactory}; +use reth_provider::{ + providers::BlockchainProvider, CanonStateSubscriptions, ChainSpecProvider, ProviderFactory, +}; use reth_prune::PrunerBuilder; use reth_revm::EvmProcessorFactory; use reth_rpc_engine_api::EngineApi; @@ -434,7 +436,11 @@ where } /// Installs an ExEx (Execution Extension) in the node. - pub fn install_exex(mut self, exex: F) -> Self + /// + /// # Note + /// + /// The ExEx ID must be unique. + pub fn install_exex(mut self, exex_id: impl Into, exex: F) -> Self where F: Fn( ExExContext< @@ -449,7 +455,7 @@ where R: Future> + Send, E: Future> + Send, { - self.state.exexs.push(Box::new(exex)); + self.state.exexs.push((exex_id.into(), Box::new(exex))); self } @@ -561,8 +567,6 @@ where let NodeComponents { transaction_pool, network, payload_builder } = components_builder.build_components(&ctx).await?; - // TODO(alexey): launch ExExs and consume their events - let BuilderContext { provider: blockchain_db, executor, @@ -585,6 +589,69 @@ where debug!(target: "reth::cli", "calling on_component_initialized hook"); on_component_initialized.on_event(node_components.clone())?; + // spawn exexs + let mut exex_handles = Vec::with_capacity(self.state.exexs.len()); + let mut exexs = Vec::with_capacity(self.state.exexs.len()); + for (id, exex) in self.state.exexs { + // create a new exex handle + let (handle, events, notifications) = ExExHandle::new(id.clone()); + exex_handles.push(handle); + + // create the launch context for the exex + let context = ExExContext { + head, + provider: blockchain_db.clone(), + task_executor: executor.clone(), + data_dir: data_dir.clone(), + config: config.clone(), + reth_config: reth_config.clone(), + events, + notifications, + }; + + let executor = executor.clone(); + exexs.push(async move { + debug!(target: "reth::cli", id, "spawning exex"); + let span = reth_tracing::tracing::info_span!("exex", id); + let _enter = span.enter(); + + // init the exex + let exex = exex.launch(context).await.unwrap(); + + // spawn it as a crit task + executor.spawn_critical("exex", async move { + info!(target: "reth::cli", id, "ExEx started"); + exex.await.unwrap_or_else(|_| panic!("exex {} crashed", id)) + }); + }); + } + + future::join_all(exexs).await; + + // spawn exex manager + if !exex_handles.is_empty() { + debug!(target: "reth::cli", "spawning exex manager"); + // todo(onbjerg): rm magic number + let exex_manager = ExExManager::new(exex_handles, 1024); + let mut exex_manager_handle = exex_manager.handle(); + executor.spawn_critical("exex manager", async move { + exex_manager.await.expect("exex manager crashed"); + }); + + // send notifications from the blockchain tree to exex manager + let mut canon_state_notifications = blockchain_tree.subscribe_to_canonical_state(); + executor.spawn_critical("exex manager blockchain tree notifications", async move { + while let Ok(notification) = canon_state_notifications.recv().await { + exex_manager_handle + .send_async(notification) + .await + .expect("blockchain tree notification could not be sent to exex manager"); + } + }); + + info!(target: "reth::cli", "ExEx Manager started"); + } + // create pipeline let network_client = network.fetch_client().await?; let (consensus_engine_tx, mut consensus_engine_rx) = unbounded_channel(); @@ -1070,7 +1137,7 @@ where } /// Installs an ExEx (Execution Extension) in the node. - pub fn install_exex(mut self, exex: F) -> Self + pub fn install_exex(mut self, exex_id: impl Into, exex: F) -> Self where F: Fn( ExExContext< @@ -1085,7 +1152,7 @@ where R: Future> + Send, E: Future> + Send, { - self.builder.state.exexs.push(Box::new(exex)); + self.builder.state.exexs.push((exex_id.into(), Box::new(exex))); self } @@ -1301,7 +1368,7 @@ pub struct ComponentsState { /// Additional RPC hooks. rpc: RpcHooks, /// The ExExs (execution extensions) of the node. - exexs: Vec>>, + exexs: Vec<(String, Box>)>, } impl std::fmt::Debug diff --git a/crates/node-builder/src/exex.rs b/crates/node-builder/src/exex.rs index d4bec54e7..ff2d0f84a 100644 --- a/crates/node-builder/src/exex.rs +++ b/crates/node-builder/src/exex.rs @@ -1,33 +1,4 @@ -#![allow(dead_code)] -// todo: expand this (examples, assumptions, invariants) -//! Execution extensions (ExEx). -//! -//! An execution extension is a task that derives its state from Reth's state. -//! -//! Some examples of state such state derives are rollups, bridges, and indexers. -//! -//! An ExEx is a [`Future`] resolving to a `Result<()>` that is run indefinitely alongside Reth. -//! -//! ExEx's are initialized using an async closure that resolves to the ExEx; this closure gets -//! passed an [`ExExContext`] where it is possible to spawn additional tasks and modify Reth. -//! -//! Most ExEx's will want to derive their state from the [`CanonStateNotification`] channel given in -//! [`ExExContext`]. A new notification is emitted whenever blocks are executed in live and -//! historical sync. -//! -//! # Pruning -//! -//! ExEx's **SHOULD** emit an `ExExEvent::FinishedHeight` event to signify what blocks have been -//! processed. This event is used by Reth to determine what state can be pruned. -//! -//! An ExEx will not receive notifications for blocks less than the block emitted in the event. To -//! clarify: if the ExEx emits `ExExEvent::FinishedHeight(0)` it will receive notifications for any -//! `block_number >= 0`. -//! -//! [`Future`]: std::future::Future -//! [`ExExContext`]: reth_exex::ExExContext -//! [`CanonStateNotification`]: reth_provider::CanonStateNotification - +//! Types for launching execution extensions (ExEx). use crate::FullNodeTypes; use futures::{future::BoxFuture, FutureExt}; use reth_exex::ExExContext; diff --git a/crates/node-ethereum/tests/it/exex.rs b/crates/node-ethereum/tests/it/exex.rs index b98f9e5fc..b1f7a92f7 100644 --- a/crates/node-ethereum/tests/it/exex.rs +++ b/crates/node-ethereum/tests/it/exex.rs @@ -29,6 +29,6 @@ fn basic_exex() { .with_database(db) .with_types(EthereumNode::default()) .with_components(EthereumNode::components()) - .install_exex(move |ctx| future::ok(DummyExEx { _ctx: ctx })) + .install_exex("dummy", move |ctx| future::ok(DummyExEx { _ctx: ctx })) .check_launch(); }