From 2224e6c48b2c74eb3c3f11125de4fc8f9fde8720 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 25 Sep 2024 12:53:51 +0100 Subject: [PATCH] feat(exex): finalize ExEx WAL on new finalized block header (#11174) --- Cargo.lock | 2 + crates/exex/exex/Cargo.toml | 1 + crates/exex/exex/src/lib.rs | 1 + crates/exex/exex/src/manager.rs | 161 +++++++++++++++++++++-- crates/exex/exex/src/wal/cache.rs | 2 +- crates/exex/exex/src/wal/mod.rs | 34 +++-- crates/exex/exex/src/wal/storage.rs | 12 +- crates/node/builder/Cargo.toml | 3 +- crates/node/builder/src/launch/engine.rs | 2 +- crates/node/builder/src/launch/exex.rs | 25 +++- crates/node/builder/src/launch/mod.rs | 2 +- crates/node/core/src/dirs.rs | 5 + 12 files changed, 213 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da895a2b6..cd34b08f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7377,6 +7377,7 @@ dependencies = [ "futures", "metrics", "reth-blockchain-tree", + "reth-chain-state", "reth-chainspec", "reth-config", "reth-db-api", @@ -7755,6 +7756,7 @@ dependencies = [ "reth-auto-seal-consensus", "reth-beacon-consensus", "reth-blockchain-tree", + "reth-chain-state", "reth-chainspec", "reth-cli-util", "reth-config", diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 9b6146d22..74f62904a 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] ## reth +reth-chain-state.workspace = true reth-chainspec.workspace = true reth-config.workspace = true reth-evm.workspace = true diff --git a/crates/exex/exex/src/lib.rs b/crates/exex/exex/src/lib.rs index d54bc3d9f..4a819767a 100644 --- a/crates/exex/exex/src/lib.rs +++ b/crates/exex/exex/src/lib.rs @@ -47,6 +47,7 @@ mod manager; pub use manager::*; mod wal; +pub use wal::*; // Re-export exex types #[doc(inline)] diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index f4c1687be..a775765c0 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1,14 +1,17 @@ use crate::{ - BackfillJobFactory, ExExEvent, ExExNotification, FinishedExExHeight, StreamBackfillJob, + wal::Wal, BackfillJobFactory, ExExEvent, ExExNotification, FinishedExExHeight, + StreamBackfillJob, }; use alloy_primitives::{BlockNumber, U256}; use eyre::OptionExt; use futures::{Stream, StreamExt}; use metrics::Gauge; +use reth_chain_state::ForkChoiceStream; use reth_chainspec::Head; use reth_evm::execute::BlockExecutorProvider; use reth_exex_types::ExExHead; use reth_metrics::{metrics::Counter, Metrics}; +use reth_primitives::SealedHeader; use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory}; use reth_tracing::tracing::debug; use std::{ @@ -530,6 +533,11 @@ pub struct ExExManager { /// The finished height of all `ExEx`'s. finished_height: watch::Sender, + /// Write-Ahead Log for the [`ExExNotification`]s. + wal: Wal, + /// A stream of finalized headers. + finalized_header_stream: ForkChoiceStream, + /// A handle to the `ExEx` manager. handle: ExExManagerHandle, /// Metrics for the `ExEx` manager. @@ -544,7 +552,12 @@ impl ExExManager { /// /// When the capacity is exceeded (which can happen if an `ExEx` is slow) no one can send /// notifications over [`ExExManagerHandle`]s until there is capacity again. - pub fn new(handles: Vec, max_capacity: usize) -> Self { + pub fn new( + handles: Vec, + max_capacity: usize, + wal: Wal, + finalized_header_stream: ForkChoiceStream, + ) -> Self { let num_exexs = handles.len(); let (handle_tx, handle_rx) = mpsc::unbounded_channel(); @@ -575,6 +588,9 @@ impl ExExManager { is_ready: is_ready_tx, finished_height: finished_height_tx, + wal, + finalized_header_stream, + handle: ExExManagerHandle { exex_tx: handle_tx, num_exexs, @@ -618,6 +634,16 @@ impl Future for ExExManager { type Output = eyre::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Drain the finalized header stream and grab the last finalized header + let mut last_finalized_header = None; + while let Poll::Ready(finalized_header) = self.finalized_header_stream.poll_next_unpin(cx) { + last_finalized_header = finalized_header; + } + // If there is a finalized header, finalize the WAL with it + if let Some(header) = last_finalized_header { + self.wal.finalize((header.number, header.hash()).into())?; + } + // drain handle notifications while self.buffer.len() < self.max_capacity { if let Poll::Ready(Some(notification)) = self.handle_rx.poll_recv(cx) { @@ -820,6 +846,13 @@ mod tests { }; use reth_testing_utils::generators::{self, random_block, BlockParams}; + fn empty_finalized_header_stream() -> ForkChoiceStream { + let (tx, rx) = watch::channel(None); + // Do not drop the sender, otherwise the receiver will always return an error + std::mem::forget(tx); + ForkChoiceStream::new(rx) + } + #[tokio::test] async fn test_delivers_events() { let (mut exex_handle, event_tx, mut _notification_rx) = @@ -833,30 +866,66 @@ mod tests { #[tokio::test] async fn test_has_exexs() { + let temp_dir = tempfile::tempdir().unwrap(); let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ()); - assert!(!ExExManager::new(vec![], 0).handle.has_exexs()); + assert!(!ExExManager::new( + vec![], + 0, + Wal::new(temp_dir.path()).unwrap(), + empty_finalized_header_stream() + ) + .handle + .has_exexs()); - assert!(ExExManager::new(vec![exex_handle_1], 0).handle.has_exexs()); + assert!(ExExManager::new( + vec![exex_handle_1], + 0, + Wal::new(temp_dir.path()).unwrap(), + empty_finalized_header_stream() + ) + .handle + .has_exexs()); } #[tokio::test] async fn test_has_capacity() { + let temp_dir = tempfile::tempdir().unwrap(); let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ()); - assert!(!ExExManager::new(vec![], 0).handle.has_capacity()); + assert!(!ExExManager::new( + vec![], + 0, + Wal::new(temp_dir.path()).unwrap(), + empty_finalized_header_stream() + ) + .handle + .has_capacity()); - assert!(ExExManager::new(vec![exex_handle_1], 10).handle.has_capacity()); + assert!(ExExManager::new( + vec![exex_handle_1], + 10, + Wal::new(temp_dir.path()).unwrap(), + empty_finalized_header_stream() + ) + .handle + .has_capacity()); } #[test] fn test_push_notification() { + let temp_dir = tempfile::tempdir().unwrap(); let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); // Create a mock ExExManager and add the exex_handle to it - let mut exex_manager = ExExManager::new(vec![exex_handle], 10); + let mut exex_manager = ExExManager::new( + vec![exex_handle], + 10, + Wal::new(temp_dir.path()).unwrap(), + empty_finalized_header_stream(), + ); // Define the notification for testing let mut block1 = SealedBlockWithSenders::default(); @@ -898,11 +967,17 @@ mod tests { #[test] fn test_update_capacity() { + let temp_dir = tempfile::tempdir().unwrap(); let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); // Create a mock ExExManager and add the exex_handle to it let max_capacity = 5; - let mut exex_manager = ExExManager::new(vec![exex_handle], max_capacity); + let mut exex_manager = ExExManager::new( + vec![exex_handle], + max_capacity, + Wal::new(temp_dir.path()).unwrap(), + empty_finalized_header_stream(), + ); // Push some notifications to fill part of the buffer let mut block1 = SealedBlockWithSenders::default(); @@ -932,6 +1007,7 @@ mod tests { #[tokio::test] async fn test_updates_block_height() { + let temp_dir = tempfile::tempdir().unwrap(); let (exex_handle, event_tx, mut _notification_rx) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); @@ -942,7 +1018,12 @@ mod tests { event_tx.send(ExExEvent::FinishedHeight(42)).unwrap(); // Create a mock ExExManager and add the exex_handle to it - let exex_manager = ExExManager::new(vec![exex_handle], 10); + let exex_manager = ExExManager::new( + vec![exex_handle], + 10, + Wal::new(temp_dir.path()).unwrap(), + empty_finalized_header_stream(), + ); let mut cx = Context::from_waker(futures::task::noop_waker_ref()); @@ -969,6 +1050,7 @@ mod tests { #[tokio::test] async fn test_updates_block_height_lower() { + let temp_dir = tempfile::tempdir().unwrap(); // Create two `ExExHandle` instances let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), Head::default(), (), ()); @@ -979,7 +1061,12 @@ mod tests { event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap(); event_tx2.send(ExExEvent::FinishedHeight(10)).unwrap(); - let exex_manager = ExExManager::new(vec![exex_handle1, exex_handle2], 10); + let exex_manager = ExExManager::new( + vec![exex_handle1, exex_handle2], + 10, + Wal::new(temp_dir.path()).unwrap(), + empty_finalized_header_stream(), + ); let mut cx = Context::from_waker(futures::task::noop_waker_ref()); @@ -1002,6 +1089,7 @@ mod tests { #[tokio::test] async fn test_updates_block_height_greater() { + let temp_dir = tempfile::tempdir().unwrap(); // Create two `ExExHandle` instances let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), Head::default(), (), ()); @@ -1015,7 +1103,12 @@ mod tests { event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap(); event_tx2.send(ExExEvent::FinishedHeight(100)).unwrap(); - let exex_manager = ExExManager::new(vec![exex_handle1, exex_handle2], 10); + let exex_manager = ExExManager::new( + vec![exex_handle1, exex_handle2], + 10, + Wal::new(temp_dir.path()).unwrap(), + empty_finalized_header_stream(), + ); let mut cx = Context::from_waker(futures::task::noop_waker_ref()); @@ -1042,12 +1135,18 @@ mod tests { #[tokio::test] async fn test_exex_manager_capacity() { + let temp_dir = tempfile::tempdir().unwrap(); let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ()); // Create an ExExManager with a small max capacity let max_capacity = 2; - let mut exex_manager = ExExManager::new(vec![exex_handle_1], max_capacity); + let mut exex_manager = ExExManager::new( + vec![exex_handle_1], + max_capacity, + Wal::new(temp_dir.path()).unwrap(), + empty_finalized_header_stream(), + ); let mut cx = Context::from_waker(futures::task::noop_waker_ref()); @@ -1223,6 +1322,44 @@ mod tests { assert_eq!(exex_handle.next_notification_id, 23); } + #[tokio::test] + async fn test_exex_wal_finalize() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + let temp_dir = tempfile::tempdir().unwrap(); + let mut wal = Wal::new(temp_dir.path()).unwrap(); + let block = random_block(&mut generators::rng(), 0, Default::default()) + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?; + let notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)), + }; + wal.commit(¬ification)?; + + let (tx, rx) = watch::channel(None); + let finalized_header_stream = ForkChoiceStream::new(rx); + + let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); + + let mut exex_manager = + std::pin::pin!(ExExManager::new(vec![exex_handle], 1, wal, finalized_header_stream)); + + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + + assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); + assert_eq!( + exex_manager.wal.iter_notifications()?.collect::>>()?, + [notification] + ); + + tx.send(Some(block.header.clone()))?; + + assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); + assert!(exex_manager.wal.iter_notifications()?.next().is_none()); + + Ok(()) + } + #[tokio::test] async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> { let mut rng = generators::rng(); diff --git a/crates/exex/exex/src/wal/cache.rs b/crates/exex/exex/src/wal/cache.rs index 3e26fdcf4..25719d11b 100644 --- a/crates/exex/exex/src/wal/cache.rs +++ b/crates/exex/exex/src/wal/cache.rs @@ -12,7 +12,7 @@ use reth_primitives::BlockNumHash; /// This cache is needed to avoid walking the WAL directory every time we want to find a /// notification corresponding to a block. #[derive(Debug)] -pub(super) struct BlockCache(BTreeMap>); +pub struct BlockCache(BTreeMap>); impl BlockCache { /// Creates a new instance of [`BlockCache`]. diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index a5e0188ca..0b699883e 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -1,17 +1,17 @@ #![allow(dead_code)] mod cache; +pub use cache::BlockCache; mod storage; +pub use storage::Storage; use std::path::Path; -use cache::BlockCache; use reth_exex_types::ExExNotification; use reth_primitives::BlockNumHash; use reth_tracing::tracing::{debug, instrument}; -use storage::Storage; -/// WAL is a write-ahead log (WAL) that stores the notifications sent to a particular ExEx. +/// WAL is a write-ahead log (WAL) that stores the notifications sent to ExExes. /// /// WAL is backed by a directory of binary files represented by [`Storage`] and a block cache /// represented by [`BlockCache`]. The role of the block cache is to avoid walking the WAL directory @@ -26,7 +26,7 @@ use storage::Storage; /// 3. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the /// WAL. #[derive(Debug)] -pub(crate) struct Wal { +pub struct Wal { /// The underlying WAL storage backed by a file. storage: Storage, /// WAL block cache. See [`cache::BlockCache`] docs for more details. @@ -35,7 +35,7 @@ pub(crate) struct Wal { impl Wal { /// Creates a new instance of [`Wal`]. - pub(crate) fn new(directory: impl AsRef) -> eyre::Result { + pub fn new(directory: impl AsRef) -> eyre::Result { let mut wal = Self { storage: Storage::new(directory)?, block_cache: BlockCache::new() }; wal.fill_block_cache()?; Ok(wal) @@ -71,8 +71,7 @@ impl Wal { reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()), committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range()) ))] - pub(crate) fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> { - debug!("Writing notification to WAL"); + pub fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> { let file_id = self.block_cache.back().map_or(0, |block| block.0 + 1); self.storage.write_notification(file_id, notification)?; @@ -94,7 +93,7 @@ impl Wal { /// 1. The block number and hash of the lowest removed block. /// 2. The notifications that were removed. #[instrument(target = "exex::wal", skip(self))] - pub(crate) fn rollback( + pub fn rollback( &mut self, to_block: BlockNumHash, ) -> eyre::Result)>> { @@ -162,9 +161,9 @@ impl Wal { /// 2. Removes the notifications from the beginning of WAL until the found notification. If this /// notification includes both finalized and non-finalized blocks, it will not be removed. #[instrument(target = "exex::wal", skip(self))] - pub(crate) fn finalize(&mut self, to_block: BlockNumHash) -> eyre::Result<()> { + pub fn finalize(&mut self, to_block: BlockNumHash) -> eyre::Result<()> { // First, walk cache to find the file ID of the notification with the finalized block and - // save the file ID with the last unfinalized block. Do not remove any notifications + // save the file ID with the first unfinalized block. Do not remove any notifications // yet. let mut unfinalized_from_file_id = None; { @@ -177,7 +176,9 @@ impl Wal { { let notification = self.storage.read_notification(file_id)?; if notification.committed_chain().unwrap().blocks().len() == 1 { - unfinalized_from_file_id = block_cache.peek().map(|(file_id, _)| *file_id); + unfinalized_from_file_id = Some( + block_cache.peek().map(|(file_id, _)| *file_id).unwrap_or(u64::MAX), + ); } else { unfinalized_from_file_id = Some(file_id); } @@ -226,6 +227,17 @@ impl Wal { Ok(()) } + + /// Returns an iterator over all notifications in the WAL. + pub(crate) fn iter_notifications( + &self, + ) -> eyre::Result> + '_>> { + let Some(range) = self.storage.files_range()? else { + return Ok(Box::new(std::iter::empty())) + }; + + Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.1)))) + } } #[cfg(test)] diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index 283b303a3..766d70b07 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -15,7 +15,7 @@ use tracing::instrument; /// Each notification is represented by a single file that contains a MessagePack-encoded /// notification. #[derive(Debug)] -pub(super) struct Storage { +pub struct Storage { /// The path to the WAL file. path: PathBuf, } @@ -107,23 +107,25 @@ impl Storage { } /// Reads the notification from the file with the given id. + #[instrument(target = "exex::wal::storage", skip(self))] pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result { - debug!(?file_id, "Reading notification from WAL"); - let file_path = self.file_path(file_id); + debug!(?file_path, "Reading notification from WAL"); + let mut file = File::open(&file_path)?; read_notification(&mut file) } /// Writes the notification to the file with the given id. + #[instrument(target = "exex::wal::storage", skip(self, notification))] pub(super) fn write_notification( &self, file_id: u64, notification: &ExExNotification, ) -> eyre::Result<()> { - debug!(?file_id, "Writing notification to WAL"); - let file_path = self.file_path(file_id); + debug!(?file_path, "Writing notification to WAL"); + let mut file = File::create_new(&file_path)?; write_notification(&mut file, notification)?; diff --git a/crates/node/builder/Cargo.toml b/crates/node/builder/Cargo.toml index 30aebfb76..1bf2ba233 100644 --- a/crates/node/builder/Cargo.toml +++ b/crates/node/builder/Cargo.toml @@ -16,6 +16,7 @@ workspace = true reth-auto-seal-consensus.workspace = true reth-beacon-consensus.workspace = true reth-blockchain-tree.workspace = true +reth-chain-state.workspace = true reth-chainspec.workspace = true reth-cli-util.workspace = true reth-config.workspace = true @@ -45,12 +46,12 @@ reth-payload-validator.workspace = true reth-primitives.workspace = true reth-provider.workspace = true reth-prune.workspace = true +reth-rpc = { workspace = true, features = ["js-tracer"] } reth-rpc-api.workspace = true reth-rpc-builder.workspace = true reth-rpc-engine-api.workspace = true reth-rpc-eth-types.workspace = true reth-rpc-layer.workspace = true -reth-rpc = { workspace = true, features = ["js-tracer"] } reth-stages.workspace = true reth-static-file.workspace = true reth-tasks.workspace = true diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 0f68772c4..708d791a0 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -146,7 +146,7 @@ where ctx.configs().clone(), ) .launch() - .await; + .await?; // create pipeline let network_client = ctx.components().network().fetch_client().await?; diff --git a/crates/node/builder/src/launch/exex.rs b/crates/node/builder/src/launch/exex.rs index 8624e3795..d03720086 100644 --- a/crates/node/builder/src/launch/exex.rs +++ b/crates/node/builder/src/launch/exex.rs @@ -3,7 +3,9 @@ use std::{fmt, fmt::Debug}; use futures::future; -use reth_exex::{ExExContext, ExExHandle, ExExManager, ExExManagerHandle}; +use reth_chain_state::ForkChoiceSubscriptions; +use reth_chainspec::EthChainSpec; +use reth_exex::{ExExContext, ExExHandle, ExExManager, ExExManagerHandle, Wal}; use reth_node_api::{FullNodeComponents, NodeTypes}; use reth_primitives::Head; use reth_provider::CanonStateSubscriptions; @@ -35,12 +37,12 @@ impl ExExLauncher { /// /// Spawns all extensions and returns the handle to the exex manager if any extensions are /// installed. - pub async fn launch(self) -> Option { + pub async fn launch(self) -> eyre::Result> { let Self { head, extensions, components, config_container } = self; if extensions.is_empty() { // nothing to launch - return None + return Ok(None) } let mut exex_handles = Vec::with_capacity(extensions.len()); @@ -94,7 +96,20 @@ impl ExExLauncher { // spawn exex manager debug!(target: "reth::cli", "spawning exex manager"); // todo(onbjerg): rm magic number - let exex_manager = ExExManager::new(exex_handles, 1024); + let exex_wal = Wal::new( + config_container + .config + .datadir + .clone() + .resolve_datadir(config_container.config.chain.chain()) + .exex_wal(), + )?; + let exex_manager = ExExManager::new( + exex_handles, + 1024, + exex_wal, + components.provider().finalized_block_stream(), + ); let exex_manager_handle = exex_manager.handle(); components.task_executor().spawn_critical("exex manager", async move { exex_manager.await.expect("exex manager crashed"); @@ -117,7 +132,7 @@ impl ExExLauncher { info!(target: "reth::cli", "ExEx Manager started"); - Some(exex_manager_handle) + Ok(Some(exex_manager_handle)) } } diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 78697056d..9c7d562d1 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -184,7 +184,7 @@ where ctx.configs().clone(), ) .launch() - .await; + .await?; // create pipeline let network_client = ctx.components().network().fetch_client().await?; diff --git a/crates/node/core/src/dirs.rs b/crates/node/core/src/dirs.rs index c788f35da..4f8507c4e 100644 --- a/crates/node/core/src/dirs.rs +++ b/crates/node/core/src/dirs.rs @@ -350,6 +350,11 @@ impl ChainPath { pub fn invalid_block_hooks(&self) -> PathBuf { self.data_dir().join("invalid_block_hooks") } + + /// Returns the path to the ExEx WAL directory for this chain. + pub fn exex_wal(&self) -> PathBuf { + self.data_dir().join("exex/wal") + } } impl AsRef for ChainPath {