diff --git a/Cargo.lock b/Cargo.lock index 568267403..c39be6072 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2692,7 +2692,6 @@ dependencies = [ "reth-chainspec", "reth-node-core", "reth-node-ethereum", - "reth-primitives", "serde_json", "tokio", ] @@ -7330,7 +7329,6 @@ dependencies = [ "alloy-eips", "alloy-genesis", "alloy-primitives", - "dashmap 6.1.0", "eyre", "futures", "itertools 0.13.0", diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 0e76ef3d4..9c3d47365 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -42,7 +42,6 @@ tokio-util.workspace = true tokio.workspace = true ## misc -dashmap.workspace = true eyre.workspace = true itertools.workspace = true metrics.workspace = true diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 44b08b097..e7c9a6504 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1124,7 +1124,7 @@ mod tests { let mut rng = generators::rng(); let temp_dir = tempfile::tempdir().unwrap(); - let mut wal = Wal::new(temp_dir.path()).unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); let provider_factory = create_test_provider_factory(); diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 9e9ee78e6..5440e57b1 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -457,7 +457,7 @@ mod tests { let mut rng = generators::rng(); let temp_dir = tempfile::tempdir().unwrap(); - let mut wal = Wal::new(temp_dir.path()).unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); let provider_factory = create_test_provider_factory(); let genesis_hash = init_genesis(&provider_factory)?; @@ -557,7 +557,7 @@ mod tests { let mut rng = generators::rng(); let temp_dir = tempfile::tempdir().unwrap(); - let mut wal = Wal::new(temp_dir.path()).unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); let provider_factory = create_test_provider_factory(); let genesis_hash = init_genesis(&provider_factory)?; diff --git a/crates/exex/exex/src/wal/cache.rs b/crates/exex/exex/src/wal/cache.rs index 097a07c7a..861ae9b50 100644 --- a/crates/exex/exex/src/wal/cache.rs +++ b/crates/exex/exex/src/wal/cache.rs @@ -1,98 +1,55 @@ -use std::collections::{BTreeMap, VecDeque}; +use std::{ + cmp::Reverse, + collections::{BinaryHeap, HashSet}, +}; use alloy_eips::BlockNumHash; -use alloy_primitives::B256; -use dashmap::DashMap; -use parking_lot::RwLock; +use alloy_primitives::{map::FbHashMap, BlockNumber, B256}; use reth_exex_types::ExExNotification; /// The block cache of the WAL. /// /// This cache is needed to avoid walking the WAL directory every time we want to find a /// notification corresponding to a block or a block corresponding to a hash. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct BlockCache { - /// A mapping of `File ID -> List of Blocks`. - /// - /// For each notification written to the WAL, there will be an entry per block written to - /// the cache with the same file ID. I.e. for each notification, there may be multiple blocks - /// in the cache. - files: RwLock>>, - /// A mapping of committed blocks `Block Hash -> (File ID, Block)`. + /// A min heap of `(Block Number, File ID)` tuples. + pub(super) blocks: BinaryHeap>, + /// A mapping of committed blocks `Block Hash -> Block`. /// /// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per /// block. - committed_blocks: DashMap, + pub(super) committed_blocks: FbHashMap<32, (u64, CachedBlock)>, } impl BlockCache { - /// Creates a new instance of [`BlockCache`]. - pub(super) fn new() -> Self { - Self { files: RwLock::new(BTreeMap::new()), committed_blocks: DashMap::new() } - } - /// Returns `true` if the cache is empty. pub(super) fn is_empty(&self) -> bool { - self.files.read().is_empty() + self.blocks.is_empty() } - /// Returns a front-to-back iterator. - pub(super) fn iter(&self) -> impl Iterator + '_ { - self.files - .read() - .iter() - .flat_map(|(k, v)| v.iter().map(move |b| (*k, *b))) - .collect::>() - .into_iter() - } + /// Removes all files from the cache that has notifications with a tip block less than or equal + /// to the given block number. + /// + /// # Returns + /// + /// A set of file IDs that were removed. + pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet { + let mut file_ids = HashSet::default(); - /// Provides a reference to the first block from the cache, or `None` if the cache is - /// empty. - pub(super) fn front(&self) -> Option<(u64, CachedBlock)> { - self.files.read().first_key_value().and_then(|(k, v)| v.front().map(|b| (*k, *b))) - } - - /// Provides a reference to the last block from the cache, or `None` if the cache is - /// empty. - pub(super) fn back(&self) -> Option<(u64, CachedBlock)> { - self.files.read().last_key_value().and_then(|(k, v)| v.back().map(|b| (*k, *b))) - } - - /// Removes the notification with the given file ID. - pub(super) fn remove_notification(&self, key: u64) -> Option> { - self.files.write().remove(&key) - } - - /// Pops the first block from the cache. If it resulted in the whole file entry being empty, - /// it will also remove the file entry. - pub(super) fn pop_front(&self) -> Option<(u64, CachedBlock)> { - let mut files = self.files.write(); - - let first_entry = files.first_entry()?; - let key = *first_entry.key(); - let blocks = first_entry.into_mut(); - let first_block = blocks.pop_front().unwrap(); - if blocks.is_empty() { - files.remove(&key); + while let Some(block @ Reverse((max_block, file_id))) = self.blocks.peek().copied() { + if max_block <= block_number { + let popped_block = self.blocks.pop().unwrap(); + debug_assert_eq!(popped_block, block); + file_ids.insert(file_id); + } else { + break + } } - Some((key, first_block)) - } + self.committed_blocks.retain(|_, (file_id, _)| !file_ids.contains(file_id)); - /// Pops the last block from the cache. If it resulted in the whole file entry being empty, - /// it will also remove the file entry. - pub(super) fn pop_back(&self) -> Option<(u64, CachedBlock)> { - let mut files = self.files.write(); - - let last_entry = files.last_entry()?; - let key = *last_entry.key(); - let blocks = last_entry.into_mut(); - let last_block = blocks.pop_back().unwrap(); - if blocks.is_empty() { - files.remove(&key); - } - - Some((key, last_block)) + file_ids } /// Returns the file ID for the notification containing the given committed block hash, if it @@ -102,59 +59,52 @@ impl BlockCache { } /// Inserts the blocks from the notification into the cache with the given file ID. - /// - /// First, inserts the reverted blocks (if any), then the committed blocks (if any). pub(super) fn insert_notification_blocks_with_file_id( - &self, + &mut self, file_id: u64, notification: &ExExNotification, ) { - let mut files = self.files.write(); - let reverted_chain = notification.reverted_chain(); let committed_chain = notification.committed_chain(); - if let Some(reverted_chain) = reverted_chain { - for block in reverted_chain.blocks().values() { - files.entry(file_id).or_default().push_back(CachedBlock { - action: CachedBlockAction::Revert, - block: (block.number, block.hash()).into(), - parent_hash: block.parent_hash, - }); - } + let max_block = + reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number).max(); + if let Some(max_block) = max_block { + self.blocks.push(Reverse((max_block, file_id))); } - if let Some(committed_chain) = committed_chain { + if let Some(committed_chain) = &committed_chain { for block in committed_chain.blocks().values() { let cached_block = CachedBlock { - action: CachedBlockAction::Commit, block: (block.number, block.hash()).into(), parent_hash: block.parent_hash, }; - files.entry(file_id).or_default().push_back(cached_block); self.committed_blocks.insert(block.hash(), (file_id, cached_block)); } } } + + #[cfg(test)] + pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u64)> { + self.blocks.clone().into_sorted_vec().into_iter().map(|entry| entry.0).collect() + } + + #[cfg(test)] + pub(super) fn committed_blocks_sorted(&self) -> Vec<(B256, u64, CachedBlock)> { + use itertools::Itertools; + + self.committed_blocks + .iter() + .map(|(hash, (file_id, block))| (*hash, *file_id, *block)) + .sorted_by_key(|(_, _, block)| (block.block.number, block.block.hash)) + .collect() + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(super) struct CachedBlock { - pub(super) action: CachedBlockAction, /// The block number and hash of the block. pub(super) block: BlockNumHash, /// The hash of the parent block. pub(super) parent_hash: B256, } - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(super) enum CachedBlockAction { - Commit, - Revert, -} - -impl CachedBlockAction { - pub(super) const fn is_commit(&self) -> bool { - matches!(self, Self::Commit) - } -} diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index 06c2f6485..593b065da 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -3,10 +3,16 @@ mod cache; pub use cache::BlockCache; mod storage; -use eyre::OptionExt; +use parking_lot::{RwLock, RwLockReadGuard}; pub use storage::Storage; -use std::{path::Path, sync::Arc}; +use std::{ + path::Path, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; use alloy_eips::BlockNumHash; use alloy_primitives::B256; @@ -40,16 +46,14 @@ impl Wal { } /// Commits the notification to WAL. - pub fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> { + pub fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> { self.inner.commit(notification) } - /// Finalizes the WAL to the given block, inclusive. + /// Finalizes the WAL up to the given canonical block, inclusive. /// - /// 1. Finds a notification with first unfinalized block (first notification containing a - /// committed block higher than `to_block`). - /// 2. Removes the notifications from the beginning of WAL until the found notification. If this - /// notification includes both finalized and non-finalized blocks, it will not be removed. + /// The caller should check that all ExExes are on the canonical chain and will not need any + /// blocks from the WAL below the provided block, inclusive. pub fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { self.inner.finalize(to_block) } @@ -65,24 +69,35 @@ impl Wal { /// Inner type for the WAL. #[derive(Debug)] struct WalInner { + next_file_id: AtomicUsize, /// The underlying WAL storage backed by a file. storage: Storage, /// WAL block cache. See [`cache::BlockCache`] docs for more details. - block_cache: BlockCache, + block_cache: RwLock, } impl WalInner { fn new(directory: impl AsRef) -> eyre::Result { - let mut wal = Self { storage: Storage::new(directory)?, block_cache: BlockCache::new() }; + let mut wal = Self { + next_file_id: AtomicUsize::new(0), + storage: Storage::new(directory)?, + block_cache: RwLock::new(BlockCache::default()), + }; wal.fill_block_cache()?; Ok(wal) } + fn block_cache(&self) -> RwLockReadGuard<'_, BlockCache> { + self.block_cache.read() + } + /// Fills the block cache with the notifications from the storage. #[instrument(target = "exex::wal", skip(self))] fn fill_block_cache(&mut self) -> eyre::Result<()> { let Some(files_range) = self.storage.files_range()? else { return Ok(()) }; + let mut block_cache = self.block_cache.write(); + for entry in self.storage.iter_notifications(files_range) { let (file_id, notification) = entry?; @@ -97,7 +112,9 @@ impl WalInner { "Inserting block cache entries" ); - self.block_cache.insert_notification_blocks_with_file_id(file_id, ¬ification); + block_cache.insert_notification_blocks_with_file_id(file_id, ¬ification); + + self.next_file_id.fetch_max(1, Ordering::Relaxed); } Ok(()) @@ -108,83 +125,30 @@ impl WalInner { committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range()) ))] fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> { - let file_id = self.block_cache.back().map_or(0, |block| block.0 + 1); + let mut block_cache = self.block_cache.write(); + + let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed) as u64; self.storage.write_notification(file_id, notification)?; debug!(?file_id, "Inserting notification blocks into the block cache"); - self.block_cache.insert_notification_blocks_with_file_id(file_id, notification); + block_cache.insert_notification_blocks_with_file_id(file_id, notification); Ok(()) } #[instrument(target = "exex::wal", skip(self))] fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { - // First, walk cache to find the file ID of the notification with the finalized block and - // save the file ID with the first unfinalized block. Do not remove any notifications - // yet. - let mut unfinalized_from_file_id = None; - { - let mut block_cache = self.block_cache.iter().peekable(); - while let Some((file_id, block)) = block_cache.next() { - debug!(?file_id, ?block, "Iterating over the block cache"); - if block.action.is_commit() && - block.block.number == to_block.number && - block.block.hash == to_block.hash - { - let notification = self - .storage - .read_notification(file_id)? - .ok_or_eyre("notification not found")?; - if notification.committed_chain().unwrap().blocks().len() == 1 { - unfinalized_from_file_id = Some( - block_cache.peek().map(|(file_id, _)| *file_id).unwrap_or(u64::MAX), - ); - } else { - unfinalized_from_file_id = Some(file_id); - } - - debug!( - ?file_id, - ?block, - ?unfinalized_from_file_id, - "Found the finalized block in the block cache" - ); - break - } - - unfinalized_from_file_id = Some(file_id); - } - } - - // If the finalized block is still not found, we can't do anything and just return. - let Some(remove_to_file_id) = unfinalized_from_file_id else { - debug!("Could not find the finalized block in WAL"); - return Ok(()) - }; - - // Remove notifications from the storage from the beginning up to the unfinalized block, not - // inclusive. - let (mut file_range_start, mut file_range_end) = (None, None); - while let Some((file_id, _)) = self.block_cache.front() { - if file_id == remove_to_file_id { - break - } - self.block_cache.pop_front(); - - file_range_start.get_or_insert(file_id); - file_range_end = Some(file_id); - } - debug!(?remove_to_file_id, "Block cache was finalized"); + let file_ids = self.block_cache.write().remove_before(to_block.number); // Remove notifications from the storage. - if let Some((file_range_start, file_range_end)) = file_range_start.zip(file_range_end) { - let removed_notifications = - self.storage.remove_notifications(file_range_start..=file_range_end)?; - debug!(?removed_notifications, "Storage was finalized"); - } else { + if file_ids.is_empty() { debug!("No notifications were finalized from the storage"); + return Ok(()) } + let removed_notifications = self.storage.remove_notifications(file_ids)?; + debug!(?removed_notifications, "Storage was finalized"); + Ok(()) } @@ -212,7 +176,7 @@ impl WalHandle { &self, block_hash: &B256, ) -> eyre::Result> { - let Some(file_id) = self.wal.block_cache.get_file_id_by_committed_block_hash(block_hash) + let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash) else { return Ok(None) }; @@ -225,17 +189,16 @@ impl WalHandle { mod tests { use std::sync::Arc; + use alloy_primitives::B256; use eyre::OptionExt; + use itertools::Itertools; use reth_exex_types::ExExNotification; use reth_provider::Chain; use reth_testing_utils::generators::{ self, random_block, random_block_range, BlockParams, BlockRangeParams, }; - use crate::wal::{ - cache::{CachedBlock, CachedBlockAction}, - Wal, - }; + use crate::wal::{cache::CachedBlock, Wal}; fn read_notifications(wal: &Wal) -> eyre::Result> { let Some(files_range) = wal.inner.storage.files_range()? else { return Ok(Vec::new()) }; @@ -247,6 +210,15 @@ mod tests { .collect::>() } + fn sort_committed_blocks( + committed_blocks: Vec<(B256, u64, CachedBlock)>, + ) -> Vec<(B256, u64, CachedBlock)> { + committed_blocks + .into_iter() + .sorted_by_key(|(_, _, block)| (block.block.number, block.block.hash)) + .collect() + } + #[test] fn test_wal() -> eyre::Result<()> { reth_tracing::init_test_tracing(); @@ -255,8 +227,8 @@ mod tests { // Create an instance of the WAL in a temporary directory let temp_dir = tempfile::tempdir()?; - let mut wal = Wal::new(&temp_dir)?; - assert!(wal.inner.block_cache.is_empty()); + let wal = Wal::new(&temp_dir)?; + assert!(wal.inner.block_cache().is_empty()); // Create 4 canonical blocks and one reorged block with number 2 let blocks = random_block_range(&mut rng, 0..=3, BlockRangeParams::default()) @@ -315,19 +287,20 @@ mod tests { // First notification (commit block 0, 1) let file_id = 0; - let committed_notification_1_cache = vec![ + let committed_notification_1_cache_blocks = (blocks[1].number, file_id); + let committed_notification_1_cache_committed_blocks = vec![ ( + blocks[0].hash(), file_id, CachedBlock { - action: CachedBlockAction::Commit, block: (blocks[0].number, blocks[0].hash()).into(), parent_hash: blocks[0].parent_hash, }, ), ( + blocks[1].hash(), file_id, CachedBlock { - action: CachedBlockAction::Commit, block: (blocks[1].number, blocks[1].hash()).into(), parent_hash: blocks[1].parent_hash, }, @@ -335,25 +308,26 @@ mod tests { ]; wal.commit(&committed_notification_1)?; assert_eq!( - wal.inner.block_cache.iter().collect::>(), - committed_notification_1_cache + wal.inner.block_cache().blocks_sorted(), + [committed_notification_1_cache_blocks] + ); + assert_eq!( + wal.inner.block_cache().committed_blocks_sorted(), + committed_notification_1_cache_committed_blocks ); assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]); // Second notification (revert block 1) wal.commit(&reverted_notification)?; let file_id = 1; - let reverted_notification_cache = vec![( - file_id, - CachedBlock { - action: CachedBlockAction::Revert, - block: (blocks[1].number, blocks[1].hash()).into(), - parent_hash: blocks[1].parent_hash, - }, - )]; + let reverted_notification_cache_blocks = (blocks[1].number, file_id); assert_eq!( - wal.inner.block_cache.iter().collect::>(), - [committed_notification_1_cache.clone(), reverted_notification_cache.clone()].concat() + wal.inner.block_cache().blocks_sorted(), + [reverted_notification_cache_blocks, committed_notification_1_cache_blocks] + ); + assert_eq!( + wal.inner.block_cache().committed_blocks_sorted(), + committed_notification_1_cache_committed_blocks ); assert_eq!( read_notifications(&wal)?, @@ -363,32 +337,42 @@ mod tests { // Third notification (commit block 1, 2) wal.commit(&committed_notification_2)?; let file_id = 2; - let committed_notification_2_cache = vec![ + let committed_notification_2_cache_blocks = (blocks[2].number, file_id); + let committed_notification_2_cache_committed_blocks = vec![ ( + block_1_reorged.hash(), file_id, CachedBlock { - action: CachedBlockAction::Commit, block: (block_1_reorged.number, block_1_reorged.hash()).into(), parent_hash: block_1_reorged.parent_hash, }, ), ( + blocks[2].hash(), file_id, CachedBlock { - action: CachedBlockAction::Commit, block: (blocks[2].number, blocks[2].hash()).into(), parent_hash: blocks[2].parent_hash, }, ), ]; assert_eq!( - wal.inner.block_cache.iter().collect::>(), + wal.inner.block_cache().blocks_sorted(), [ - committed_notification_1_cache.clone(), - reverted_notification_cache.clone(), - committed_notification_2_cache.clone() + committed_notification_2_cache_blocks, + reverted_notification_cache_blocks, + committed_notification_1_cache_blocks, ] - .concat() + ); + assert_eq!( + wal.inner.block_cache().committed_blocks_sorted(), + sort_committed_blocks( + [ + committed_notification_1_cache_committed_blocks.clone(), + committed_notification_2_cache_committed_blocks.clone() + ] + .concat() + ) ); assert_eq!( read_notifications(&wal)?, @@ -402,47 +386,50 @@ mod tests { // Fourth notification (revert block 2, commit block 2, 3) wal.commit(&reorged_notification)?; let file_id = 3; - let reorged_notification_cache = vec![ + let reorged_notification_cache_blocks = (blocks[3].number, file_id); + let reorged_notification_cache_committed_blocks = vec![ ( + block_2_reorged.hash(), file_id, CachedBlock { - action: CachedBlockAction::Revert, - block: (blocks[2].number, blocks[2].hash()).into(), - parent_hash: blocks[2].parent_hash, - }, - ), - ( - file_id, - CachedBlock { - action: CachedBlockAction::Commit, block: (block_2_reorged.number, block_2_reorged.hash()).into(), parent_hash: block_2_reorged.parent_hash, }, ), ( + blocks[3].hash(), file_id, CachedBlock { - action: CachedBlockAction::Commit, block: (blocks[3].number, blocks[3].hash()).into(), parent_hash: blocks[3].parent_hash, }, ), ]; assert_eq!( - wal.inner.block_cache.iter().collect::>(), + wal.inner.block_cache().blocks_sorted(), [ - committed_notification_1_cache, - reverted_notification_cache, - committed_notification_2_cache.clone(), - reorged_notification_cache.clone() + reorged_notification_cache_blocks, + committed_notification_2_cache_blocks, + reverted_notification_cache_blocks, + committed_notification_1_cache_blocks, ] - .concat() + ); + assert_eq!( + wal.inner.block_cache().committed_blocks_sorted(), + sort_committed_blocks( + [ + committed_notification_1_cache_committed_blocks, + committed_notification_2_cache_committed_blocks.clone(), + reorged_notification_cache_committed_blocks.clone() + ] + .concat() + ) ); assert_eq!( read_notifications(&wal)?, vec![ - committed_notification_1.clone(), - reverted_notification.clone(), + committed_notification_1, + reverted_notification, committed_notification_2.clone(), reorged_notification.clone() ] @@ -454,8 +441,18 @@ mod tests { // the notifications before it. wal.finalize((block_1_reorged.number, block_1_reorged.hash()).into())?; assert_eq!( - wal.inner.block_cache.iter().collect::>(), - [committed_notification_2_cache, reorged_notification_cache].concat() + wal.inner.block_cache().blocks_sorted(), + [reorged_notification_cache_blocks, committed_notification_2_cache_blocks] + ); + assert_eq!( + wal.inner.block_cache().committed_blocks_sorted(), + sort_committed_blocks( + [ + committed_notification_2_cache_committed_blocks, + reorged_notification_cache_committed_blocks + ] + .concat() + ) ); assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]); diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index 817d57d19..7ae98077e 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -41,10 +41,16 @@ impl Storage { /// Removes notification for the given file ID from the storage. #[instrument(target = "exex::wal::storage", skip(self))] - fn remove_notification(&self, file_id: u64) { + fn remove_notification(&self, file_id: u64) -> bool { match reth_fs_util::remove_file(self.file_path(file_id)) { - Ok(()) => debug!("Notification was removed from the storage"), - Err(err) => debug!(?err, "Failed to remove notification from the storage"), + Ok(()) => { + debug!("Notification was removed from the storage"); + true + } + Err(err) => { + debug!(?err, "Failed to remove notification from the storage"); + false + } } } @@ -67,17 +73,24 @@ impl Storage { Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id)) } - /// Removes notifications from the storage according to the given range. + /// Removes notifications from the storage according to the given list of file IDs. /// /// # Returns /// /// Number of removed notifications. - pub(super) fn remove_notifications(&self, range: RangeInclusive) -> eyre::Result { - for id in range.clone() { - self.remove_notification(id); + pub(super) fn remove_notifications( + &self, + file_ids: impl IntoIterator, + ) -> eyre::Result { + let mut deleted = 0; + + for id in file_ids { + if self.remove_notification(id) { + deleted += 1; + } } - Ok(range.count()) + Ok(deleted) } pub(super) fn iter_notifications( @@ -91,7 +104,7 @@ impl Storage { }) } - /// Reads the notification from the file with the given id. + /// Reads the notification from the file with the given ID. #[instrument(target = "exex::wal::storage", skip(self))] pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result> { let file_path = self.file_path(file_id); @@ -107,7 +120,7 @@ impl Storage { Ok(serde_json::from_reader(&mut file)?) } - /// Writes the notification to the file with the given id. + /// Writes the notification to the file with the given ID. #[instrument(target = "exex::wal::storage", skip(self, notification))] pub(super) fn write_notification( &self, diff --git a/examples/custom-dev-node/Cargo.toml b/examples/custom-dev-node/Cargo.toml index 2586795b4..8ed277686 100644 --- a/examples/custom-dev-node/Cargo.toml +++ b/examples/custom-dev-node/Cargo.toml @@ -10,7 +10,6 @@ license.workspace = true reth.workspace = true reth-chainspec.workspace = true reth-node-core.workspace = true -reth-primitives.workspace = true reth-node-ethereum = { workspace = true, features = ["test-utils"] } futures-util.workspace = true