feat: introduce custom exex wal errors (#11789)

Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
This commit is contained in:
caglarkaya
2025-01-28 17:18:33 +03:00
committed by GitHub
parent 36eec984a0
commit 77568f8d3e
6 changed files with 64 additions and 31 deletions

1
Cargo.lock generated
View File

@ -7579,6 +7579,7 @@ dependencies = [
"rmp-serde", "rmp-serde",
"secp256k1", "secp256k1",
"tempfile", "tempfile",
"thiserror 2.0.11",
"tokio", "tokio",
"tokio-util", "tokio-util",
"tracing", "tracing",

View File

@ -47,6 +47,7 @@ itertools = { workspace = true, features = ["use_std"] }
metrics.workspace = true metrics.workspace = true
parking_lot.workspace = true parking_lot.workspace = true
rmp-serde = "1.3" rmp-serde = "1.3"
thiserror.workspace = true
tracing.workspace = true tracing.workspace = true
[dev-dependencies] [dev-dependencies]

View File

@ -657,6 +657,7 @@ impl<N: NodePrimitives> Clone for ExExManagerHandle<N> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::wal::WalResult;
use alloy_primitives::B256; use alloy_primitives::B256;
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use rand::Rng; use rand::Rng;
@ -1356,7 +1357,7 @@ mod tests {
); );
// WAL shouldn't contain the genesis notification, because it's finalized // WAL shouldn't contain the genesis notification, because it's finalized
assert_eq!( assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?, exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
[notification.clone()] [notification.clone()]
); );
@ -1364,7 +1365,7 @@ mod tests {
assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
assert_eq!( assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?, exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
[notification.clone()] [notification.clone()]
); );
@ -1378,7 +1379,7 @@ mod tests {
// WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
// non-canonical block // non-canonical block
assert_eq!( assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?, exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
[notification] [notification]
); );

View File

@ -0,0 +1,29 @@
//! Wal Errors
use std::path::PathBuf;
/// Wal Result type.
pub type WalResult<T> = Result<T, WalError>;
/// Wal Error types
#[derive(Debug, thiserror::Error)]
pub enum WalError {
/// Filesystem error at the path
#[error(transparent)]
FsPathError(#[from] reth_fs_util::FsPathError),
/// Directory entry reading error
#[error("failed to get {0} directory entry: {1}")]
DirEntry(PathBuf, std::io::Error),
/// Error when reading file metadata
#[error("failed to get metadata for file {0}: {1}")]
FileMetadata(u32, std::io::Error),
/// Parse error
#[error("failed to parse file name: {0}")]
Parse(String),
/// Notification not found error
#[error("notification {0} not found")]
FileNotFound(u32),
/// Decode error
#[error("failed to decode notification {0} from {1}: {2}")]
Decode(u32, PathBuf, rmp_serde::decode::Error),
}

View File

@ -8,6 +8,8 @@ use reth_primitives::EthPrimitives;
pub use storage::Storage; pub use storage::Storage;
mod metrics; mod metrics;
use metrics::Metrics; use metrics::Metrics;
mod error;
pub use error::{WalError, WalResult};
use std::{ use std::{
path::Path, path::Path,
@ -43,7 +45,7 @@ where
N: NodePrimitives, N: NodePrimitives,
{ {
/// Creates a new instance of [`Wal`]. /// Creates a new instance of [`Wal`].
pub fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> { pub fn new(directory: impl AsRef<Path>) -> WalResult<Self> {
Ok(Self { inner: Arc::new(WalInner::new(directory)?) }) Ok(Self { inner: Arc::new(WalInner::new(directory)?) })
} }
@ -53,7 +55,7 @@ where
} }
/// Commits the notification to WAL. /// Commits the notification to WAL.
pub fn commit(&self, notification: &ExExNotification<N>) -> eyre::Result<()> { pub fn commit(&self, notification: &ExExNotification<N>) -> WalResult<()> {
self.inner.commit(notification) self.inner.commit(notification)
} }
@ -61,14 +63,14 @@ where
/// ///
/// The caller should check that all ExExes are on the canonical chain and will not need any /// The caller should check that all ExExes are on the canonical chain and will not need any
/// blocks from the WAL below the provided block, inclusive. /// blocks from the WAL below the provided block, inclusive.
pub fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { pub fn finalize(&self, to_block: BlockNumHash) -> WalResult<()> {
self.inner.finalize(to_block) self.inner.finalize(to_block)
} }
/// Returns an iterator over all notifications in the WAL. /// Returns an iterator over all notifications in the WAL.
pub fn iter_notifications( pub fn iter_notifications(
&self, &self,
) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification<N>>> + '_>> { ) -> WalResult<Box<dyn Iterator<Item = WalResult<ExExNotification<N>>> + '_>> {
self.inner.iter_notifications() self.inner.iter_notifications()
} }
@ -93,7 +95,7 @@ impl<N> WalInner<N>
where where
N: NodePrimitives, N: NodePrimitives,
{ {
fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> { fn new(directory: impl AsRef<Path>) -> WalResult<Self> {
let mut wal = Self { let mut wal = Self {
next_file_id: AtomicU32::new(0), next_file_id: AtomicU32::new(0),
storage: Storage::new(directory)?, storage: Storage::new(directory)?,
@ -110,7 +112,7 @@ where
/// Fills the block cache with the notifications from the storage. /// Fills the block cache with the notifications from the storage.
#[instrument(skip(self))] #[instrument(skip(self))]
fn fill_block_cache(&mut self) -> eyre::Result<()> { fn fill_block_cache(&mut self) -> WalResult<()> {
let Some(files_range) = self.storage.files_range()? else { return Ok(()) }; let Some(files_range) = self.storage.files_range()? else { return Ok(()) };
self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed); self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);
@ -145,7 +147,7 @@ where
reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()), reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range()) committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
))] ))]
fn commit(&self, notification: &ExExNotification<N>) -> eyre::Result<()> { fn commit(&self, notification: &ExExNotification<N>) -> WalResult<()> {
let mut block_cache = self.block_cache.write(); let mut block_cache = self.block_cache.write();
let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed); let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
@ -160,7 +162,7 @@ where
} }
#[instrument(skip(self))] #[instrument(skip(self))]
fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { fn finalize(&self, to_block: BlockNumHash) -> WalResult<()> {
let mut block_cache = self.block_cache.write(); let mut block_cache = self.block_cache.write();
let file_ids = block_cache.remove_before(to_block.number); let file_ids = block_cache.remove_before(to_block.number);
@ -195,7 +197,7 @@ where
/// Returns an iterator over all notifications in the WAL. /// Returns an iterator over all notifications in the WAL.
fn iter_notifications( fn iter_notifications(
&self, &self,
) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification<N>>> + '_>> { ) -> WalResult<Box<dyn Iterator<Item = WalResult<ExExNotification<N>>> + '_>> {
let Some(range) = self.storage.files_range()? else { let Some(range) = self.storage.files_range()? else {
return Ok(Box::new(std::iter::empty())) return Ok(Box::new(std::iter::empty()))
}; };
@ -218,7 +220,7 @@ where
pub fn get_committed_notification_by_block_hash( pub fn get_committed_notification_by_block_hash(
&self, &self,
block_hash: &B256, block_hash: &B256,
) -> eyre::Result<Option<ExExNotification<N>>> { ) -> WalResult<Option<ExExNotification<N>>> {
let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash) let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash)
else { else {
return Ok(None) return Ok(None)
@ -233,7 +235,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::wal::{cache::CachedBlock, Wal}; use crate::wal::{cache::CachedBlock, error::WalResult, Wal};
use alloy_primitives::B256; use alloy_primitives::B256;
use itertools::Itertools; use itertools::Itertools;
use reth_exex_types::ExExNotification; use reth_exex_types::ExExNotification;
@ -243,7 +245,7 @@ mod tests {
}; };
use std::sync::Arc; use std::sync::Arc;
fn read_notifications(wal: &Wal) -> eyre::Result<Vec<ExExNotification>> { fn read_notifications(wal: &Wal) -> WalResult<Vec<ExExNotification>> {
wal.inner.storage.files_range()?.map_or(Ok(Vec::new()), |range| { wal.inner.storage.files_range()?.map_or(Ok(Vec::new()), |range| {
wal.inner wal.inner
.storage .storage

View File

@ -4,7 +4,7 @@ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };
use eyre::OptionExt; use crate::wal::{WalError, WalResult};
use reth_exex_types::ExExNotification; use reth_exex_types::ExExNotification;
use reth_node_api::NodePrimitives; use reth_node_api::NodePrimitives;
use reth_primitives::EthPrimitives; use reth_primitives::EthPrimitives;
@ -30,7 +30,7 @@ where
{ {
/// Creates a new instance of [`Storage`] backed by the file at the given path and creates /// Creates a new instance of [`Storage`] backed by the file at the given path and creates
/// it doesn't exist. /// it doesn't exist.
pub(super) fn new(path: impl AsRef<Path>) -> eyre::Result<Self> { pub(super) fn new(path: impl AsRef<Path>) -> WalResult<Self> {
reth_fs_util::create_dir_all(&path)?; reth_fs_util::create_dir_all(&path)?;
Ok(Self { path: path.as_ref().to_path_buf(), _pd: std::marker::PhantomData }) Ok(Self { path: path.as_ref().to_path_buf(), _pd: std::marker::PhantomData })
@ -40,11 +40,11 @@ where
self.path.join(format!("{id}.{FILE_EXTENSION}")) self.path.join(format!("{id}.{FILE_EXTENSION}"))
} }
fn parse_filename(filename: &str) -> eyre::Result<u32> { fn parse_filename(filename: &str) -> WalResult<u32> {
filename filename
.strip_suffix(".wal") .strip_suffix(".wal")
.and_then(|s| s.parse().ok()) .and_then(|s| s.parse().ok())
.ok_or_eyre(format!("failed to parse file name: {filename}")) .ok_or_else(|| WalError::Parse(filename.to_string()))
} }
/// Removes notification for the given file ID from the storage. /// Removes notification for the given file ID from the storage.
@ -72,12 +72,12 @@ where
/// Returns the range of file IDs in the storage. /// Returns the range of file IDs in the storage.
/// ///
/// If there are no files in the storage, returns `None`. /// If there are no files in the storage, returns `None`.
pub(super) fn files_range(&self) -> eyre::Result<Option<RangeInclusive<u32>>> { pub(super) fn files_range(&self) -> WalResult<Option<RangeInclusive<u32>>> {
let mut min_id = None; let mut min_id = None;
let mut max_id = None; let mut max_id = None;
for entry in reth_fs_util::read_dir(&self.path)? { for entry in reth_fs_util::read_dir(&self.path)? {
let entry = entry?; let entry = entry.map_err(|err| WalError::DirEntry(self.path.clone(), err))?;
if entry.path().extension() == Some(FILE_EXTENSION.as_ref()) { if entry.path().extension() == Some(FILE_EXTENSION.as_ref()) {
let file_name = entry.file_name(); let file_name = entry.file_name();
@ -99,7 +99,7 @@ where
pub(super) fn remove_notifications( pub(super) fn remove_notifications(
&self, &self,
file_ids: impl IntoIterator<Item = u32>, file_ids: impl IntoIterator<Item = u32>,
) -> eyre::Result<(usize, u64)> { ) -> WalResult<(usize, u64)> {
let mut deleted_total = 0; let mut deleted_total = 0;
let mut deleted_size = 0; let mut deleted_size = 0;
@ -116,10 +116,10 @@ where
pub(super) fn iter_notifications( pub(super) fn iter_notifications(
&self, &self,
range: RangeInclusive<u32>, range: RangeInclusive<u32>,
) -> impl Iterator<Item = eyre::Result<(u32, u64, ExExNotification<N>)>> + '_ { ) -> impl Iterator<Item = WalResult<(u32, u64, ExExNotification<N>)>> + '_ {
range.map(move |id| { range.map(move |id| {
let (notification, size) = let (notification, size) =
self.read_notification(id)?.ok_or_eyre(format!("notification {id} not found"))?; self.read_notification(id)?.ok_or(WalError::FileNotFound(id))?;
Ok((id, size, notification)) Ok((id, size, notification))
}) })
@ -130,7 +130,7 @@ where
pub(super) fn read_notification( pub(super) fn read_notification(
&self, &self,
file_id: u32, file_id: u32,
) -> eyre::Result<Option<(ExExNotification<N>, u64)>> { ) -> WalResult<Option<(ExExNotification<N>, u64)>> {
let file_path = self.file_path(file_id); let file_path = self.file_path(file_id);
debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL"); debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL");
@ -139,13 +139,12 @@ where
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()), Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()),
}; };
let size = file.metadata()?.len(); let size = file.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len();
// Deserialize using the bincode- and msgpack-compatible serde wrapper // Deserialize using the bincode- and msgpack-compatible serde wrapper
let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> = let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> =
rmp_serde::decode::from_read(&mut file).map_err(|err| { rmp_serde::decode::from_read(&mut file)
eyre::eyre!("failed to decode notification from {file_path:?}: {err:?}") .map_err(|err| WalError::Decode(file_id, file_path, err))?;
})?;
Ok(Some((notification.into(), size))) Ok(Some((notification.into(), size)))
} }
@ -160,7 +159,7 @@ where
&self, &self,
file_id: u32, file_id: u32,
notification: &ExExNotification<N>, notification: &ExExNotification<N>,
) -> eyre::Result<u64> { ) -> WalResult<u64> {
let file_path = self.file_path(file_id); let file_path = self.file_path(file_id);
debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL"); debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");
@ -172,7 +171,7 @@ where
rmp_serde::encode::write(file, &notification) rmp_serde::encode::write(file, &notification)
})?; })?;
Ok(file_path.metadata()?.len()) Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len())
} }
} }