From d6b861ea5d068c17baf160e1fd1ca37cdbe154cc Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 23 Apr 2024 11:05:46 +0100 Subject: [PATCH] feat(exex): send `ExExNotification` instead of `CanonStateNotification` (#7803) --- Cargo.lock | 2 +- crates/exex/src/context.rs | 10 ++-- crates/exex/src/lib.rs | 3 ++ crates/exex/src/manager.rs | 58 ++++++++++----------- crates/exex/src/notification.rs | 54 ++++++++++++++++++++ crates/node-builder/src/builder.rs | 2 +- crates/stages/src/stages/execution.rs | 11 ++-- examples/exex/minimal/Cargo.toml | 2 +- examples/exex/minimal/src/main.rs | 23 +++++---- examples/exex/op-bridge/src/main.rs | 72 ++++++++++++++------------- 10 files changed, 148 insertions(+), 89 deletions(-) create mode 100644 crates/exex/src/notification.rs diff --git a/Cargo.lock b/Cargo.lock index dfb64fdd7..d7effc49c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4723,7 +4723,7 @@ dependencies = [ "reth-node-core", "reth-node-ethereum", "reth-primitives", - "reth-provider", + "reth-tracing", "tokio", ] diff --git a/crates/exex/src/context.rs b/crates/exex/src/context.rs index 619679e85..df2b51377 100644 --- a/crates/exex/src/context.rs +++ b/crates/exex/src/context.rs @@ -4,11 +4,10 @@ use reth_node_core::{ node_config::NodeConfig, }; use reth_primitives::Head; -use reth_provider::CanonStateNotification; use reth_tasks::TaskExecutor; use tokio::sync::mpsc::{Receiver, UnboundedSender}; -use crate::ExExEvent; +use crate::{ExExEvent, ExExNotification}; /// Captures the context that an ExEx has access to. #[derive(Debug)] @@ -35,12 +34,11 @@ pub struct ExExContext { /// Additionally, the exex can pre-emptively emit a `FinishedHeight` event to specify what /// blocks to receive notifications for. pub events: UnboundedSender, - /// Channel to receive [`CanonStateNotification`]s on state transitions. + /// Channel to receive [`ExExNotification`]s. /// /// # Important /// - /// Once a `CanonStateNotification` is sent over the channel, it is considered delivered by the + /// Once a an [`ExExNotification`] is sent over the channel, it is considered delivered by the /// node. - pub notifications: Receiver, - // TODO(alexey): add pool, payload builder, anything else? + pub notifications: Receiver, } diff --git a/crates/exex/src/lib.rs b/crates/exex/src/lib.rs index 638d8af79..4e2d0dd85 100644 --- a/crates/exex/src/lib.rs +++ b/crates/exex/src/lib.rs @@ -42,3 +42,6 @@ pub use event::*; mod manager; pub use manager::*; + +mod notification; +pub use notification::*; diff --git a/crates/exex/src/manager.rs b/crates/exex/src/manager.rs index 59f2bde58..95b950f32 100644 --- a/crates/exex/src/manager.rs +++ b/crates/exex/src/manager.rs @@ -1,8 +1,7 @@ -use crate::ExExEvent; +use crate::{ExExEvent, ExExNotification}; use metrics::Gauge; use reth_metrics::{metrics::Counter, Metrics}; use reth_primitives::{BlockNumber, FinishedExExHeight}; -use reth_provider::CanonStateNotification; use reth_tracing::tracing::debug; use std::{ collections::VecDeque, @@ -24,7 +23,7 @@ use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture}; #[derive(Metrics)] #[metrics(scope = "exex")] struct ExExMetrics { - /// The total number of canonical state notifications sent to an ExEx. + /// The total number of notifications sent to an ExEx. notifications_sent_total: Counter, /// The total number of events an ExEx has sent to the manager. events_sent_total: Counter, @@ -42,8 +41,8 @@ pub struct ExExHandle { /// Metrics for an ExEx. metrics: ExExMetrics, - /// Channel to send [`CanonStateNotification`]s to the ExEx. - sender: PollSender, + /// Channel to send [`ExExNotification`]s to the ExEx. + sender: PollSender, /// Channel to receive [`ExExEvent`]s from the ExEx. receiver: UnboundedReceiver, /// The ID of the next notification to send to this ExEx. @@ -59,22 +58,22 @@ impl ExExHandle { /// Create a new handle for the given ExEx. /// /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a - /// [`Receiver`] for [`CanonStateNotification`]s that should be given to the ExEx. - pub fn new(id: String) -> (Self, UnboundedSender, Receiver) { - let (canon_tx, canon_rx) = mpsc::channel(1); + /// [`Receiver`] for [`ExExNotification`]s that should be given to the ExEx. + pub fn new(id: String) -> (Self, UnboundedSender, Receiver) { + let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); ( Self { id: id.clone(), metrics: ExExMetrics::new_with_labels(&[("exex", id)]), - sender: PollSender::new(canon_tx), + sender: PollSender::new(notification_tx), receiver: event_rx, next_notification_id: 0, finished_height: None, }, event_tx, - canon_rx, + notification_rx, ) } @@ -85,14 +84,20 @@ impl ExExHandle { fn send( &mut self, cx: &mut Context<'_>, - (event_id, notification): &(usize, CanonStateNotification), - ) -> Poll>> { + (event_id, notification): &(usize, ExExNotification), + ) -> Poll>> { // check that this notification is above the finished height of the exex if the exex has set // one if let Some(finished_height) = self.finished_height { - if finished_height >= notification.tip().number { - self.next_notification_id = event_id + 1; - return Poll::Ready(Ok(())) + match notification { + ExExNotification::ChainCommitted { new } | + ExExNotification::ChainReorged { old: _, new } + if finished_height >= new.tip().number => + { + self.next_notification_id = event_id + 1; + return Poll::Ready(Ok(())) + } + _ => (), } } @@ -142,18 +147,18 @@ pub struct ExExManager { /// Handles to communicate with the ExEx's. exex_handles: Vec, - /// [`CanonStateNotification`] channel from the [`ExExManagerHandle`]s. - handle_rx: UnboundedReceiver, + /// [`ExExNotification`] channel from the [`ExExManagerHandle`]s. + handle_rx: UnboundedReceiver, /// The minimum notification ID currently present in the buffer. min_id: usize, - /// Monotonically increasing ID for [`CanonStateNotification`]s. + /// Monotonically increasing ID for [`ExExNotification`]s. next_id: usize, - /// Internal buffer of [`CanonStateNotification`]s. + /// Internal buffer of [`ExExNotification`]s. /// /// The first element of the tuple is a monotonically increasing ID unique to the notification /// (the second element of the tuple). - buffer: VecDeque<(usize, CanonStateNotification)>, + buffer: VecDeque<(usize, ExExNotification)>, /// Max size of the internal state notifications buffer. max_capacity: usize, /// Current state notifications buffer capacity. @@ -244,7 +249,7 @@ impl ExExManager { /// Pushes a new notification into the managers internal buffer, assigning the notification a /// unique ID. - fn push_notification(&mut self, notification: CanonStateNotification) { + fn push_notification(&mut self, notification: ExExNotification) { let next_id = self.next_id; self.buffer.push_back((next_id, notification)); self.next_id += 1; @@ -334,7 +339,7 @@ impl Future for ExExManager { #[derive(Debug)] pub struct ExExManagerHandle { /// Channel to send notifications to the ExEx manager. - exex_tx: UnboundedSender, + exex_tx: UnboundedSender, /// The number of ExEx's running on the node. num_exexs: usize, /// A watch channel denoting whether the manager is ready for new notifications or not. @@ -376,10 +381,7 @@ impl ExExManagerHandle { /// Synchronously send a notification over the channel to all execution extensions. /// /// Senders should call [`Self::has_capacity`] first. - pub fn send( - &self, - notification: CanonStateNotification, - ) -> Result<(), SendError> { + pub fn send(&self, notification: ExExNotification) -> Result<(), SendError> { self.exex_tx.send(notification) } @@ -389,8 +391,8 @@ impl ExExManagerHandle { /// capacity in the channel, the future will wait. pub async fn send_async( &mut self, - notification: CanonStateNotification, - ) -> Result<(), SendError> { + notification: ExExNotification, + ) -> Result<(), SendError> { self.ready().await; self.exex_tx.send(notification) } diff --git a/crates/exex/src/notification.rs b/crates/exex/src/notification.rs new file mode 100644 index 000000000..ae8091e0c --- /dev/null +++ b/crates/exex/src/notification.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; + +use reth_provider::{CanonStateNotification, Chain}; + +/// Notifications sent to an ExEx. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ExExNotification { + /// Chain got committed without a reorg, and only the new chain is returned. + ChainCommitted { + /// The new chain after commit. + new: Arc, + }, + /// Chain got reorged, and both the old and the new chains are returned. + ChainReorged { + /// The old chain before reorg. + old: Arc, + /// The new chain after reorg. + new: Arc, + }, + /// Chain got reverted, and only the old chain is returned. + ChainReverted { + /// The old chain before reversion. + old: Arc, + }, +} + +impl ExExNotification { + /// Returns the committed chain from the [Self::ChainCommitted] and [Self::ChainReorged] + /// variants, if any. + pub fn committed_chain(&self) -> Option> { + match self { + Self::ChainCommitted { new } | Self::ChainReorged { old: _, new } => Some(new.clone()), + Self::ChainReverted { .. } => None, + } + } + + /// Returns the reverted chain from the [Self::ChainReorged] and [Self::ChainReverted] variants, + /// if any. + pub fn reverted_chain(&self) -> Option> { + match self { + Self::ChainReorged { old, new: _ } | Self::ChainReverted { old } => Some(old.clone()), + Self::ChainCommitted { .. } => None, + } + } +} + +impl From for ExExNotification { + fn from(notification: CanonStateNotification) -> Self { + match notification { + CanonStateNotification::Commit { new } => Self::ChainCommitted { new }, + CanonStateNotification::Reorg { old, new } => Self::ChainReorged { old, new }, + } + } +} diff --git a/crates/node-builder/src/builder.rs b/crates/node-builder/src/builder.rs index 49be32b33..c47478047 100644 --- a/crates/node-builder/src/builder.rs +++ b/crates/node-builder/src/builder.rs @@ -661,7 +661,7 @@ where executor.spawn_critical("exex manager blockchain tree notifications", async move { while let Ok(notification) = canon_state_notifications.recv().await { handle - .send_async(notification) + .send_async(notification.into()) .await .expect("blockchain tree notification could not be sent to exex manager"); } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 7f22ecaef..b581af403 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -3,7 +3,7 @@ use num_traits::Zero; use reth_db::{ cursor::DbCursorRO, database::Database, static_file::HeaderMask, tables, transaction::DbTx, }; -use reth_exex::ExExManagerHandle; +use reth_exex::{ExExManagerHandle, ExExNotification}; use reth_primitives::{ stage::{ CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint, StageCheckpoint, StageId, @@ -12,9 +12,8 @@ use reth_primitives::{ }; use reth_provider::{ providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter}, - BlockReader, CanonStateNotification, Chain, DatabaseProviderRW, ExecutorFactory, - HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, StatsReader, - TransactionVariant, + BlockReader, Chain, DatabaseProviderRW, ExecutorFactory, HeaderProvider, + LatestStateProviderRef, OriginalValuesKnown, ProviderError, StatsReader, TransactionVariant, }; use reth_stages_api::{ BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage, StageError, @@ -265,7 +264,7 @@ impl ExecutionStage { // NOTE: We can ignore the error here, since an error means that the channel is closed, // which means the manager has died, which then in turn means the node is shutting down. - let _ = self.exex_manager_handle.send(CanonStateNotification::Commit { new: chain }); + let _ = self.exex_manager_handle.send(ExExNotification::ChainCommitted { new: chain }); } let time = Instant::now(); @@ -436,7 +435,7 @@ impl Stage for ExecutionStage { // NOTE: We can ignore the error here, since an error means that the channel is closed, // which means the manager has died, which then in turn means the node is shutting down. - let _ = self.exex_manager_handle.send(CanonStateNotification::Reorg { + let _ = self.exex_manager_handle.send(ExExNotification::ChainReorged { old: Arc::new(chain), new: Arc::new(Chain::default()), }); diff --git a/examples/exex/minimal/Cargo.toml b/examples/exex/minimal/Cargo.toml index c1c586fd5..fc6eba841 100644 --- a/examples/exex/minimal/Cargo.toml +++ b/examples/exex/minimal/Cargo.toml @@ -12,7 +12,7 @@ reth-node-api.workspace = true reth-node-core.workspace = true reth-node-ethereum.workspace = true reth-primitives.workspace = true -reth-provider.workspace = true +reth-tracing.workspace = true eyre.workspace = true tokio.workspace = true diff --git a/examples/exex/minimal/src/main.rs b/examples/exex/minimal/src/main.rs index 1c2463cda..18d3acd2c 100644 --- a/examples/exex/minimal/src/main.rs +++ b/examples/exex/minimal/src/main.rs @@ -1,8 +1,8 @@ use futures::Future; -use reth_exex::{ExExContext, ExExEvent}; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_api::FullNodeComponents; use reth_node_ethereum::EthereumNode; -use reth_provider::CanonStateNotification; +use reth_tracing::tracing::info; /// The initialization logic of the ExEx is just an async function. /// @@ -21,19 +21,20 @@ async fn exex_init( async fn exex(mut ctx: ExExContext) -> eyre::Result<()> { while let Some(notification) = ctx.notifications.recv().await { match ¬ification { - CanonStateNotification::Commit { new } => { - println!("Received commit: {:?}", new.first().number..=new.tip().number); + ExExNotification::ChainCommitted { new } => { + info!(committed_chain = ?new.range(), "Received commit"); } - CanonStateNotification::Reorg { old, new } => { - println!( - "Received reorg: {:?} -> {:?}", - old.first().number..=old.tip().number, - new.first().number..=new.tip().number - ); + ExExNotification::ChainReorged { old, new } => { + info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); + } + ExExNotification::ChainReverted { old } => { + info!(reverted_chain = ?old.range(), "Received revert"); } }; - ctx.events.send(ExExEvent::FinishedHeight(notification.tip().number))?; + if let Some(committed_chain) = notification.committed_chain() { + ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; + } } Ok(()) } diff --git a/examples/exex/op-bridge/src/main.rs b/examples/exex/op-bridge/src/main.rs index 92e6ef106..0f48b0a5f 100644 --- a/examples/exex/op-bridge/src/main.rs +++ b/examples/exex/op-bridge/src/main.rs @@ -94,7 +94,8 @@ async fn op_bridge_exex( ) -> eyre::Result<()> { // Process all new chain state notifications while let Some(notification) = ctx.notifications.recv().await { - if let Some(reverted_chain) = notification.reverted() { + // Revert all deposits and withdrawals + if let Some(reverted_chain) = notification.reverted_chain() { let events = decode_chain_into_events(&reverted_chain); let mut deposits = 0; @@ -126,22 +127,22 @@ async fn op_bridge_exex( } // Insert all new deposits and withdrawals - let committed_chain = notification.committed(); - let events = decode_chain_into_events(&committed_chain); + if let Some(committed_chain) = notification.committed_chain() { + let events = decode_chain_into_events(&committed_chain); - let mut deposits = 0; - let mut withdrawals = 0; + let mut deposits = 0; + let mut withdrawals = 0; - for (block, tx, log, event) in events { - match event { - // L1 -> L2 deposit - L1StandardBridgeEvents::ETHBridgeInitiated(ETHBridgeInitiated { - amount, - from, - to, - .. - }) => { - let inserted = connection.execute( + for (block, tx, log, event) in events { + match event { + // L1 -> L2 deposit + L1StandardBridgeEvents::ETHBridgeInitiated(ETHBridgeInitiated { + amount, + from, + to, + .. + }) => { + let inserted = connection.execute( r#" INSERT INTO deposits (block_number, tx_hash, contract_address, "from", "to", amount) VALUES (?, ?, ?, ?, ?, ?) @@ -155,16 +156,16 @@ async fn op_bridge_exex( amount.to_string(), ), )?; - deposits += inserted; - } - // L2 -> L1 withdrawal - L1StandardBridgeEvents::ETHBridgeFinalized(ETHBridgeFinalized { - amount, - from, - to, - .. - }) => { - let inserted = connection.execute( + deposits += inserted; + } + // L2 -> L1 withdrawal + L1StandardBridgeEvents::ETHBridgeFinalized(ETHBridgeFinalized { + amount, + from, + to, + .. + }) => { + let inserted = connection.execute( r#" INSERT INTO withdrawals (block_number, tx_hash, contract_address, "from", "to", amount) VALUES (?, ?, ?, ?, ?, ?) @@ -178,17 +179,18 @@ async fn op_bridge_exex( amount.to_string(), ), )?; - withdrawals += inserted; - } - _ => continue, - }; + withdrawals += inserted; + } + _ => continue, + }; + } + + info!(block_range = ?committed_chain.range(), %deposits, %withdrawals, "Committed chain events"); + + // Send a finished height event, signaling the node that we don't need any blocks below + // this height anymore + ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; } - - info!(block_range = ?committed_chain.range(), %deposits, %withdrawals, "Committed chain events"); - - // Send a finished height event, signaling the node that we don't need any blocks below - // this height anymore - ctx.events.send(ExExEvent::FinishedHeight(notification.tip().number))?; } Ok(())