diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index bdb2d4c27..9af12e260 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -1,5 +1,6 @@ use std::fmt::Debug; +use reth_exex_types::ExExHead; use reth_node_api::{FullNodeComponents, NodeTypes, NodeTypesWithEngine}; use reth_node_core::node_config::NodeConfig; use reth_primitives::Head; @@ -32,7 +33,7 @@ pub struct ExExContext { /// considered delivered by the node. pub notifications: ExExNotifications, - /// node components + /// Node components pub components: Node, } @@ -92,4 +93,16 @@ impl ExExContext { pub fn task_executor(&self) -> &TaskExecutor { self.components.task_executor() } + + /// Sets notifications stream to [`crate::ExExNotificationsWithoutHead`], a stream of + /// notifications without a head. + pub fn set_notifications_without_head(&mut self) { + self.notifications.set_without_head(); + } + + /// Sets notifications stream to [`crate::ExExNotificationsWithHead`], a stream of notifications + /// with the provided head. + pub fn set_notifications_with_head(&mut self, head: ExExHead) { + self.notifications.set_with_head(head); + } } diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index e8e24c09d..31dc82222 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -610,12 +610,16 @@ impl Clone for ExExManagerHandle { mod tests { use super::*; use alloy_primitives::B256; - use eyre::OptionExt; - use futures::{FutureExt, StreamExt}; + use futures::StreamExt; use rand::Rng; + use reth_db_common::init::init_genesis; + use reth_evm_ethereum::execute::EthExecutorProvider; use reth_primitives::SealedBlockWithSenders; - use reth_provider::{test_utils::create_test_provider_factory, BlockWriter, Chain}; - use reth_testing_utils::generators::{self, random_block}; + use reth_provider::{ + providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader, + Chain, TransactionVariant, + }; + use reth_testing_utils::generators; fn empty_finalized_header_stream() -> ForkChoiceStream { let (tx, rx) = watch::channel(None); @@ -975,11 +979,20 @@ mod tests { #[tokio::test] async fn exex_handle_new() { + let provider_factory = create_test_provider_factory(); + init_genesis(&provider_factory).unwrap(); + let provider = BlockchainProvider2::new(provider_factory).unwrap(); + let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (mut exex_handle, _, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + provider, + EthExecutorProvider::mainnet(), + wal.handle(), + ); // Check initial state assert_eq!(exex_handle.id, "test_exex"); @@ -1008,7 +1021,7 @@ mod tests { // Send a notification and ensure it's received correctly match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notifications.next().await.unwrap(); + let received_notification = notifications.next().await.unwrap().unwrap(); assert_eq!(received_notification, notification); } Poll::Pending => panic!("Notification send is pending"), @@ -1021,11 +1034,20 @@ mod tests { #[tokio::test] async fn test_notification_if_finished_height_gt_chain_tip() { + let provider_factory = create_test_provider_factory(); + init_genesis(&provider_factory).unwrap(); + let provider = BlockchainProvider2::new(provider_factory).unwrap(); + let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (mut exex_handle, _, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + provider, + EthExecutorProvider::mainnet(), + wal.handle(), + ); // Set finished_height to a value higher than the block tip exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random())); @@ -1046,11 +1068,7 @@ mod tests { poll_fn(|cx| { // The notification should be skipped, so nothing should be sent. // Check that the receiver channel is indeed empty - assert_eq!( - notifications.poll_next_unpin(cx), - Poll::Pending, - "Receiver channel should be empty" - ); + assert!(notifications.poll_next_unpin(cx).is_pending()); Poll::Ready(()) }) .await; @@ -1066,11 +1084,20 @@ mod tests { #[tokio::test] async fn test_sends_chain_reorged_notification() { + let provider_factory = create_test_provider_factory(); + init_genesis(&provider_factory).unwrap(); + let provider = BlockchainProvider2::new(provider_factory).unwrap(); + let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (mut exex_handle, _, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + provider, + EthExecutorProvider::mainnet(), + wal.handle(), + ); let notification = ExExNotification::ChainReorged { old: Arc::new(Chain::default()), @@ -1086,7 +1113,7 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notifications.next().await.unwrap(); + let received_notification = notifications.next().await.unwrap().unwrap(); assert_eq!(received_notification, notification); } Poll::Pending | Poll::Ready(Err(_)) => { @@ -1100,11 +1127,20 @@ mod tests { #[tokio::test] async fn test_sends_chain_reverted_notification() { + let provider_factory = create_test_provider_factory(); + init_genesis(&provider_factory).unwrap(); + let provider = BlockchainProvider2::new(provider_factory).unwrap(); + let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (mut exex_handle, _, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + provider, + EthExecutorProvider::mainnet(), + wal.handle(), + ); let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; @@ -1117,7 +1153,7 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notifications.next().await.unwrap(); + let received_notification = notifications.next().await.unwrap().unwrap(); assert_eq!(received_notification, notification); } Poll::Pending | Poll::Ready(Err(_)) => { @@ -1135,30 +1171,34 @@ mod tests { let mut rng = generators::rng(); + let provider_factory = create_test_provider_factory(); + let genesis_hash = init_genesis(&provider_factory).unwrap(); + let genesis_block = provider_factory + .sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash) + .unwrap() + .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + let provider = BlockchainProvider2::new(provider_factory).unwrap(); + let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let provider_factory = create_test_provider_factory(); - - let block = random_block(&mut rng, 0, Default::default()) - .seal_with_senders() - .ok_or_eyre("failed to recover senders")?; - let provider_rw = provider_factory.provider_rw()?; - provider_rw.insert_block(block.clone())?; - provider_rw.commit()?; + let (exex_handle, events_tx, mut notifications) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + provider.clone(), + EthExecutorProvider::mainnet(), + wal.handle(), + ); let notification = ExExNotification::ChainCommitted { - new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)), + new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)), }; let (finalized_headers_tx, rx) = watch::channel(None); let finalized_header_stream = ForkChoiceStream::new(rx); - let (exex_handle, events_tx, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); - let mut exex_manager = std::pin::pin!(ExExManager::new( - provider_factory, + provider, vec![exex_handle], 1, wal, @@ -1170,16 +1210,13 @@ mod tests { exex_manager.handle().send(notification.clone())?; assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending()); - assert_eq!( - notifications.next().poll_unpin(&mut cx), - Poll::Ready(Some(notification.clone())) - ); + assert_eq!(notifications.next().await.unwrap().unwrap(), notification.clone()); assert_eq!( exex_manager.wal.iter_notifications()?.collect::>>()?, [notification.clone()] ); - finalized_headers_tx.send(Some(block.header.clone()))?; + finalized_headers_tx.send(Some(genesis_block.header.clone()))?; assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event assert_eq!( @@ -1192,7 +1229,7 @@ mod tests { .send(ExExEvent::FinishedHeight((rng.gen::(), rng.gen::()).into())) .unwrap(); - finalized_headers_tx.send(Some(block.header.clone()))?; + finalized_headers_tx.send(Some(genesis_block.header.clone()))?; assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a // non-canonical block @@ -1202,9 +1239,9 @@ mod tests { ); // Send a `FinishedHeight` event with a canonical block - events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap(); + events_tx.send(ExExEvent::FinishedHeight(genesis_block.num_hash())).unwrap(); - finalized_headers_tx.send(Some(block.header.clone()))?; + finalized_headers_tx.send(Some(genesis_block.header.clone()))?; assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL is finalized assert!(exex_manager.wal.iter_notifications()?.next().is_none()); diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 116dac954..6efdb1775 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -13,8 +13,129 @@ use std::{ }; use tokio::sync::mpsc::Receiver; -/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. +/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. If the +/// stream is configured with a head via [`ExExNotifications::set_with_head`] or +/// [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head. +#[derive(Debug)] pub struct ExExNotifications { + inner: ExExNotificationsInner, +} + +#[derive(Debug)] +enum ExExNotificationsInner { + /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. + WithoutHead(ExExNotificationsWithoutHead), + /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that + /// are committed or reverted after the given head. + WithHead(ExExNotificationsWithHead), + /// Internal state used when transitioning between [`ExExNotificationsInner::WithoutHead`] and + /// [`ExExNotificationsInner::WithHead`]. + Invalid, +} + +impl ExExNotifications { + /// Creates a new stream of [`ExExNotifications`] without a head. + pub const fn new( + node_head: Head, + provider: P, + executor: E, + notifications: Receiver, + wal_handle: WalHandle, + ) -> Self { + Self { + inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new( + node_head, + provider, + executor, + notifications, + wal_handle, + )), + } + } + + /// Sets [`ExExNotifications`] to a stream of [`ExExNotification`]s without a head. + /// + /// It's a no-op if the stream has already been configured without a head. + /// + /// See the documentation of [`ExExNotificationsWithoutHead`] for more details. + pub fn set_without_head(&mut self) { + let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid); + self.inner = ExExNotificationsInner::WithoutHead(match current { + ExExNotificationsInner::WithoutHead(notifications) => notifications, + ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new( + notifications.node_head, + notifications.provider, + notifications.executor, + notifications.notifications, + notifications.wal_handle, + ), + ExExNotificationsInner::Invalid => unreachable!(), + }); + } + + /// Returns a new [`ExExNotifications`] without a head. + /// + /// See the documentation of [`ExExNotificationsWithoutHead`] for more details. + pub fn without_head(mut self) -> Self { + self.set_without_head(); + self + } + + /// Sets [`ExExNotifications`] to a stream of [`ExExNotification`]s with the provided head. + /// + /// It's a no-op if the stream has already been configured with a head. + /// + /// See the documentation of [`ExExNotificationsWithHead`] for more details. + pub fn set_with_head(&mut self, exex_head: ExExHead) { + let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid); + self.inner = ExExNotificationsInner::WithHead(match current { + ExExNotificationsInner::WithoutHead(notifications) => { + notifications.with_head(exex_head) + } + ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithHead::new( + notifications.node_head, + notifications.provider, + notifications.executor, + notifications.notifications, + notifications.wal_handle, + exex_head, + ), + ExExNotificationsInner::Invalid => unreachable!(), + }); + } + + /// Returns a new [`ExExNotifications`] with the provided head. + /// + /// See the documentation of [`ExExNotificationsWithHead`] for more details. + pub fn with_head(mut self, exex_head: ExExHead) -> Self { + self.set_with_head(exex_head); + self + } +} + +impl Stream for ExExNotifications +where + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider + Clone + Unpin + 'static, +{ + type Item = eyre::Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut self.get_mut().inner { + ExExNotificationsInner::WithoutHead(notifications) => { + notifications.poll_next_unpin(cx).map(|result| result.map(Ok)) + } + ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx), + ExExNotificationsInner::Invalid => unreachable!(), + } + } +} + +/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. +pub struct ExExNotificationsWithoutHead { node_head: Head, provider: P, executor: E, @@ -22,7 +143,7 @@ pub struct ExExNotifications { wal_handle: WalHandle, } -impl Debug for ExExNotifications { +impl Debug for ExExNotificationsWithoutHead { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ExExNotifications") .field("provider", &self.provider) @@ -32,9 +153,9 @@ impl Debug for ExExNotifications { } } -impl ExExNotifications { - /// Creates a new instance of [`ExExNotifications`]. - pub const fn new( +impl ExExNotificationsWithoutHead { + /// Creates a new instance of [`ExExNotificationsWithoutHead`]. + const fn new( node_head: Head, provider: P, executor: E, @@ -44,70 +165,8 @@ impl ExExNotifications { Self { node_head, provider, executor, notifications, wal_handle } } - /// Receives the next value for this receiver. - /// - /// This method returns `None` if the channel has been closed and there are - /// no remaining messages in the channel's buffer. This indicates that no - /// further values can ever be received from this `Receiver`. The channel is - /// closed when all senders have been dropped, or when [`Receiver::close`] is called. - /// - /// # Cancel safety - /// - /// This method is cancel safe. If `recv` is used as the event in a - /// [`tokio::select!`] statement and some other branch - /// completes first, it is guaranteed that no messages were received on this - /// channel. - /// - /// For full documentation, see [`Receiver::recv`]. - #[deprecated(note = "use `ExExNotifications::next` and its `Stream` implementation instead")] - pub async fn recv(&mut self) -> Option { - self.notifications.recv().await - } - - /// Polls to receive the next message on this channel. - /// - /// This method returns: - /// - /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a - /// spurious failure happens. - /// * `Poll::Ready(Some(message))` if a message is available. - /// * `Poll::Ready(None)` if the channel has been closed and all messages sent before it was - /// closed have been received. - /// - /// When the method returns `Poll::Pending`, the `Waker` in the provided - /// `Context` is scheduled to receive a wakeup when a message is sent on any - /// receiver, or when the channel is closed. Note that on multiple calls to - /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context` - /// passed to the most recent call is scheduled to receive a wakeup. - /// - /// If this method returns `Poll::Pending` due to a spurious failure, then - /// the `Waker` will be notified when the situation causing the spurious - /// failure has been resolved. Note that receiving such a wakeup does not - /// guarantee that the next call will succeed — it could fail with another - /// spurious failure. - /// - /// For full documentation, see [`Receiver::poll_recv`]. - #[deprecated( - note = "use `ExExNotifications::poll_next` and its `Stream` implementation instead" - )] - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - self.notifications.poll_recv(cx) - } -} - -impl ExExNotifications -where - P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, - E: BlockExecutorProvider + Clone + Unpin + 'static, -{ - /// Subscribe to notifications with the given head. This head is the ExEx's - /// latest view of the host chain. - /// - /// Notifications will be sent starting from the head, not inclusive. For - /// example, if `head.number == 10`, then the first notification will be - /// with `block.number == 11`. A `head.number` of 10 indicates that the ExEx - /// has processed up to block 10, and is ready to process block 11. - pub fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { + /// Subscribe to notifications with the given head. + fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { ExExNotificationsWithHead::new( self.node_head, self.provider, @@ -119,7 +178,7 @@ where } } -impl Stream for ExExNotifications { +impl Stream for ExExNotificationsWithoutHead { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -128,7 +187,13 @@ impl Stream for ExExNotifications { } /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are -/// committed or reverted after the given head. +/// committed or reverted after the given head. The head is the ExEx's latest view of the host +/// chain. +/// +/// Notifications will be sent starting from the head, not inclusive. For example, if +/// `exex_head.number == 10`, then the first notification will be with `block.number == 11`. An +/// `exex_head.number` of 10 indicates that the ExEx has processed up to block 10, and is ready to +/// process block 11. #[derive(Debug)] pub struct ExExNotificationsWithHead { node_head: Head, @@ -147,13 +212,9 @@ pub struct ExExNotificationsWithHead { backfill_job: Option>, } -impl ExExNotificationsWithHead -where - P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, - E: BlockExecutorProvider + Clone + Unpin + 'static, -{ +impl ExExNotificationsWithHead { /// Creates a new [`ExExNotificationsWithHead`]. - pub const fn new( + const fn new( node_head: Head, provider: P, executor: E, @@ -173,7 +234,13 @@ where backfill_job: None, } } +} +impl ExExNotificationsWithHead +where + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider + Clone + Unpin + 'static, +{ /// Checks if the ExEx head is on the canonical chain. /// /// If the head block is not found in the database or it's ahead of the node head, it means @@ -367,7 +434,7 @@ mod tests { notifications_tx.send(notification.clone()).await?; - let mut notifications = ExExNotifications::new( + let mut notifications = ExExNotificationsWithoutHead::new( node_head, provider, EthExecutorProvider::mainnet(), @@ -438,7 +505,7 @@ mod tests { notifications_tx.send(notification.clone()).await?; - let mut notifications = ExExNotifications::new( + let mut notifications = ExExNotificationsWithoutHead::new( node_head, provider, EthExecutorProvider::mainnet(), @@ -528,7 +595,7 @@ mod tests { notifications_tx.send(new_notification.clone()).await?; - let mut notifications = ExExNotifications::new( + let mut notifications = ExExNotificationsWithoutHead::new( node_head, provider, EthExecutorProvider::mainnet(), @@ -609,7 +676,7 @@ mod tests { notifications_tx.send(new_notification.clone()).await?; - let mut notifications = ExExNotifications::new( + let mut notifications = ExExNotificationsWithoutHead::new( node_head, provider, EthExecutorProvider::mainnet(),