diff --git a/Cargo.lock b/Cargo.lock index 76be58ef9..6ebce860a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7579,6 +7579,7 @@ dependencies = [ "rmp-serde", "secp256k1", "tempfile", + "thiserror 2.0.11", "tokio", "tokio-util", "tracing", diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index ad463a855..ec40b1c1a 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -47,6 +47,7 @@ itertools = { workspace = true, features = ["use_std"] } metrics.workspace = true parking_lot.workspace = true rmp-serde = "1.3" +thiserror.workspace = true tracing.workspace = true [dev-dependencies] diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 967075679..a7480ab74 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -657,6 +657,7 @@ impl Clone for ExExManagerHandle { #[cfg(test)] mod tests { use super::*; + use crate::wal::WalResult; use alloy_primitives::B256; use futures::{StreamExt, TryStreamExt}; use rand::Rng; @@ -1356,7 +1357,7 @@ mod tests { ); // WAL shouldn't contain the genesis notification, because it's finalized assert_eq!( - exex_manager.wal.iter_notifications()?.collect::>>()?, + exex_manager.wal.iter_notifications()?.collect::>>()?, [notification.clone()] ); @@ -1364,7 +1365,7 @@ mod tests { assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event assert_eq!( - exex_manager.wal.iter_notifications()?.collect::>>()?, + exex_manager.wal.iter_notifications()?.collect::>>()?, [notification.clone()] ); @@ -1378,7 +1379,7 @@ mod tests { // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a // non-canonical block assert_eq!( - exex_manager.wal.iter_notifications()?.collect::>>()?, + exex_manager.wal.iter_notifications()?.collect::>>()?, [notification] ); diff --git a/crates/exex/exex/src/wal/error.rs b/crates/exex/exex/src/wal/error.rs new file mode 100644 index 000000000..b091890f6 --- /dev/null +++ b/crates/exex/exex/src/wal/error.rs @@ -0,0 +1,29 @@ +//! Wal Errors + +use std::path::PathBuf; + +/// Wal Result type. +pub type WalResult = Result; + +/// Wal Error types +#[derive(Debug, thiserror::Error)] +pub enum WalError { + /// Filesystem error at the path + #[error(transparent)] + FsPathError(#[from] reth_fs_util::FsPathError), + /// Directory entry reading error + #[error("failed to get {0} directory entry: {1}")] + DirEntry(PathBuf, std::io::Error), + /// Error when reading file metadata + #[error("failed to get metadata for file {0}: {1}")] + FileMetadata(u32, std::io::Error), + /// Parse error + #[error("failed to parse file name: {0}")] + Parse(String), + /// Notification not found error + #[error("notification {0} not found")] + FileNotFound(u32), + /// Decode error + #[error("failed to decode notification {0} from {1}: {2}")] + Decode(u32, PathBuf, rmp_serde::decode::Error), +} diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index 58fb12441..c22729510 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -8,6 +8,8 @@ use reth_primitives::EthPrimitives; pub use storage::Storage; mod metrics; use metrics::Metrics; +mod error; +pub use error::{WalError, WalResult}; use std::{ path::Path, @@ -43,7 +45,7 @@ where N: NodePrimitives, { /// Creates a new instance of [`Wal`]. - pub fn new(directory: impl AsRef) -> eyre::Result { + pub fn new(directory: impl AsRef) -> WalResult { Ok(Self { inner: Arc::new(WalInner::new(directory)?) }) } @@ -53,7 +55,7 @@ where } /// Commits the notification to WAL. - pub fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> { + pub fn commit(&self, notification: &ExExNotification) -> WalResult<()> { self.inner.commit(notification) } @@ -61,14 +63,14 @@ where /// /// The caller should check that all ExExes are on the canonical chain and will not need any /// blocks from the WAL below the provided block, inclusive. - pub fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { + pub fn finalize(&self, to_block: BlockNumHash) -> WalResult<()> { self.inner.finalize(to_block) } /// Returns an iterator over all notifications in the WAL. pub fn iter_notifications( &self, - ) -> eyre::Result>> + '_>> { + ) -> WalResult>> + '_>> { self.inner.iter_notifications() } @@ -93,7 +95,7 @@ impl WalInner where N: NodePrimitives, { - fn new(directory: impl AsRef) -> eyre::Result { + fn new(directory: impl AsRef) -> WalResult { let mut wal = Self { next_file_id: AtomicU32::new(0), storage: Storage::new(directory)?, @@ -110,7 +112,7 @@ where /// Fills the block cache with the notifications from the storage. #[instrument(skip(self))] - fn fill_block_cache(&mut self) -> eyre::Result<()> { + fn fill_block_cache(&mut self) -> WalResult<()> { let Some(files_range) = self.storage.files_range()? else { return Ok(()) }; self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed); @@ -145,7 +147,7 @@ where reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()), committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range()) ))] - fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> { + fn commit(&self, notification: &ExExNotification) -> WalResult<()> { let mut block_cache = self.block_cache.write(); let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed); @@ -160,7 +162,7 @@ where } #[instrument(skip(self))] - fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { + fn finalize(&self, to_block: BlockNumHash) -> WalResult<()> { let mut block_cache = self.block_cache.write(); let file_ids = block_cache.remove_before(to_block.number); @@ -195,7 +197,7 @@ where /// Returns an iterator over all notifications in the WAL. fn iter_notifications( &self, - ) -> eyre::Result>> + '_>> { + ) -> WalResult>> + '_>> { let Some(range) = self.storage.files_range()? else { return Ok(Box::new(std::iter::empty())) }; @@ -218,7 +220,7 @@ where pub fn get_committed_notification_by_block_hash( &self, block_hash: &B256, - ) -> eyre::Result>> { + ) -> WalResult>> { let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash) else { return Ok(None) @@ -233,7 +235,7 @@ where #[cfg(test)] mod tests { - use crate::wal::{cache::CachedBlock, Wal}; + use crate::wal::{cache::CachedBlock, error::WalResult, Wal}; use alloy_primitives::B256; use itertools::Itertools; use reth_exex_types::ExExNotification; @@ -243,7 +245,7 @@ mod tests { }; use std::sync::Arc; - fn read_notifications(wal: &Wal) -> eyre::Result> { + fn read_notifications(wal: &Wal) -> WalResult> { wal.inner.storage.files_range()?.map_or(Ok(Vec::new()), |range| { wal.inner .storage diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index e437fcd7f..bab835505 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -4,7 +4,7 @@ use std::{ path::{Path, PathBuf}, }; -use eyre::OptionExt; +use crate::wal::{WalError, WalResult}; use reth_exex_types::ExExNotification; use reth_node_api::NodePrimitives; use reth_primitives::EthPrimitives; @@ -30,7 +30,7 @@ where { /// Creates a new instance of [`Storage`] backed by the file at the given path and creates /// it doesn't exist. - pub(super) fn new(path: impl AsRef) -> eyre::Result { + pub(super) fn new(path: impl AsRef) -> WalResult { reth_fs_util::create_dir_all(&path)?; Ok(Self { path: path.as_ref().to_path_buf(), _pd: std::marker::PhantomData }) @@ -40,11 +40,11 @@ where self.path.join(format!("{id}.{FILE_EXTENSION}")) } - fn parse_filename(filename: &str) -> eyre::Result { + fn parse_filename(filename: &str) -> WalResult { filename .strip_suffix(".wal") .and_then(|s| s.parse().ok()) - .ok_or_eyre(format!("failed to parse file name: {filename}")) + .ok_or_else(|| WalError::Parse(filename.to_string())) } /// Removes notification for the given file ID from the storage. @@ -72,12 +72,12 @@ where /// Returns the range of file IDs in the storage. /// /// If there are no files in the storage, returns `None`. - pub(super) fn files_range(&self) -> eyre::Result>> { + pub(super) fn files_range(&self) -> WalResult>> { let mut min_id = None; let mut max_id = None; for entry in reth_fs_util::read_dir(&self.path)? { - let entry = entry?; + let entry = entry.map_err(|err| WalError::DirEntry(self.path.clone(), err))?; if entry.path().extension() == Some(FILE_EXTENSION.as_ref()) { let file_name = entry.file_name(); @@ -99,7 +99,7 @@ where pub(super) fn remove_notifications( &self, file_ids: impl IntoIterator, - ) -> eyre::Result<(usize, u64)> { + ) -> WalResult<(usize, u64)> { let mut deleted_total = 0; let mut deleted_size = 0; @@ -116,10 +116,10 @@ where pub(super) fn iter_notifications( &self, range: RangeInclusive, - ) -> impl Iterator)>> + '_ { + ) -> impl Iterator)>> + '_ { range.map(move |id| { let (notification, size) = - self.read_notification(id)?.ok_or_eyre(format!("notification {id} not found"))?; + self.read_notification(id)?.ok_or(WalError::FileNotFound(id))?; Ok((id, size, notification)) }) @@ -130,7 +130,7 @@ where pub(super) fn read_notification( &self, file_id: u32, - ) -> eyre::Result, u64)>> { + ) -> WalResult, u64)>> { let file_path = self.file_path(file_id); debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL"); @@ -139,13 +139,12 @@ where Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()), }; - let size = file.metadata()?.len(); + let size = file.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len(); // Deserialize using the bincode- and msgpack-compatible serde wrapper let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> = - rmp_serde::decode::from_read(&mut file).map_err(|err| { - eyre::eyre!("failed to decode notification from {file_path:?}: {err:?}") - })?; + rmp_serde::decode::from_read(&mut file) + .map_err(|err| WalError::Decode(file_id, file_path, err))?; Ok(Some((notification.into(), size))) } @@ -160,7 +159,7 @@ where &self, file_id: u32, notification: &ExExNotification, - ) -> eyre::Result { + ) -> WalResult { let file_path = self.file_path(file_id); debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL"); @@ -172,7 +171,7 @@ where rmp_serde::encode::write(file, ¬ification) })?; - Ok(file_path.metadata()?.len()) + Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len()) } }