mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(exex): finalize WAL below the given block (#11324)
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -2692,7 +2692,6 @@ dependencies = [
|
|||||||
"reth-chainspec",
|
"reth-chainspec",
|
||||||
"reth-node-core",
|
"reth-node-core",
|
||||||
"reth-node-ethereum",
|
"reth-node-ethereum",
|
||||||
"reth-primitives",
|
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
@ -7330,7 +7329,6 @@ dependencies = [
|
|||||||
"alloy-eips",
|
"alloy-eips",
|
||||||
"alloy-genesis",
|
"alloy-genesis",
|
||||||
"alloy-primitives",
|
"alloy-primitives",
|
||||||
"dashmap 6.1.0",
|
|
||||||
"eyre",
|
"eyre",
|
||||||
"futures",
|
"futures",
|
||||||
"itertools 0.13.0",
|
"itertools 0.13.0",
|
||||||
|
|||||||
@ -42,7 +42,6 @@ tokio-util.workspace = true
|
|||||||
tokio.workspace = true
|
tokio.workspace = true
|
||||||
|
|
||||||
## misc
|
## misc
|
||||||
dashmap.workspace = true
|
|
||||||
eyre.workspace = true
|
eyre.workspace = true
|
||||||
itertools.workspace = true
|
itertools.workspace = true
|
||||||
metrics.workspace = true
|
metrics.workspace = true
|
||||||
|
|||||||
@ -1124,7 +1124,7 @@ mod tests {
|
|||||||
let mut rng = generators::rng();
|
let mut rng = generators::rng();
|
||||||
|
|
||||||
let temp_dir = tempfile::tempdir().unwrap();
|
let temp_dir = tempfile::tempdir().unwrap();
|
||||||
let mut wal = Wal::new(temp_dir.path()).unwrap();
|
let wal = Wal::new(temp_dir.path()).unwrap();
|
||||||
|
|
||||||
let provider_factory = create_test_provider_factory();
|
let provider_factory = create_test_provider_factory();
|
||||||
|
|
||||||
|
|||||||
@ -457,7 +457,7 @@ mod tests {
|
|||||||
let mut rng = generators::rng();
|
let mut rng = generators::rng();
|
||||||
|
|
||||||
let temp_dir = tempfile::tempdir().unwrap();
|
let temp_dir = tempfile::tempdir().unwrap();
|
||||||
let mut wal = Wal::new(temp_dir.path()).unwrap();
|
let wal = Wal::new(temp_dir.path()).unwrap();
|
||||||
|
|
||||||
let provider_factory = create_test_provider_factory();
|
let provider_factory = create_test_provider_factory();
|
||||||
let genesis_hash = init_genesis(&provider_factory)?;
|
let genesis_hash = init_genesis(&provider_factory)?;
|
||||||
@ -557,7 +557,7 @@ mod tests {
|
|||||||
let mut rng = generators::rng();
|
let mut rng = generators::rng();
|
||||||
|
|
||||||
let temp_dir = tempfile::tempdir().unwrap();
|
let temp_dir = tempfile::tempdir().unwrap();
|
||||||
let mut wal = Wal::new(temp_dir.path()).unwrap();
|
let wal = Wal::new(temp_dir.path()).unwrap();
|
||||||
|
|
||||||
let provider_factory = create_test_provider_factory();
|
let provider_factory = create_test_provider_factory();
|
||||||
let genesis_hash = init_genesis(&provider_factory)?;
|
let genesis_hash = init_genesis(&provider_factory)?;
|
||||||
|
|||||||
@ -1,98 +1,55 @@
|
|||||||
use std::collections::{BTreeMap, VecDeque};
|
use std::{
|
||||||
|
cmp::Reverse,
|
||||||
|
collections::{BinaryHeap, HashSet},
|
||||||
|
};
|
||||||
|
|
||||||
use alloy_eips::BlockNumHash;
|
use alloy_eips::BlockNumHash;
|
||||||
use alloy_primitives::B256;
|
use alloy_primitives::{map::FbHashMap, BlockNumber, B256};
|
||||||
use dashmap::DashMap;
|
|
||||||
use parking_lot::RwLock;
|
|
||||||
use reth_exex_types::ExExNotification;
|
use reth_exex_types::ExExNotification;
|
||||||
|
|
||||||
/// The block cache of the WAL.
|
/// The block cache of the WAL.
|
||||||
///
|
///
|
||||||
/// This cache is needed to avoid walking the WAL directory every time we want to find a
|
/// This cache is needed to avoid walking the WAL directory every time we want to find a
|
||||||
/// notification corresponding to a block or a block corresponding to a hash.
|
/// notification corresponding to a block or a block corresponding to a hash.
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Default)]
|
||||||
pub struct BlockCache {
|
pub struct BlockCache {
|
||||||
/// A mapping of `File ID -> List of Blocks`.
|
/// A min heap of `(Block Number, File ID)` tuples.
|
||||||
///
|
pub(super) blocks: BinaryHeap<Reverse<(BlockNumber, u64)>>,
|
||||||
/// For each notification written to the WAL, there will be an entry per block written to
|
/// A mapping of committed blocks `Block Hash -> Block`.
|
||||||
/// the cache with the same file ID. I.e. for each notification, there may be multiple blocks
|
|
||||||
/// in the cache.
|
|
||||||
files: RwLock<BTreeMap<u64, VecDeque<CachedBlock>>>,
|
|
||||||
/// A mapping of committed blocks `Block Hash -> (File ID, Block)`.
|
|
||||||
///
|
///
|
||||||
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
|
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
|
||||||
/// block.
|
/// block.
|
||||||
committed_blocks: DashMap<B256, (u64, CachedBlock)>,
|
pub(super) committed_blocks: FbHashMap<32, (u64, CachedBlock)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockCache {
|
impl BlockCache {
|
||||||
/// Creates a new instance of [`BlockCache`].
|
|
||||||
pub(super) fn new() -> Self {
|
|
||||||
Self { files: RwLock::new(BTreeMap::new()), committed_blocks: DashMap::new() }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns `true` if the cache is empty.
|
/// Returns `true` if the cache is empty.
|
||||||
pub(super) fn is_empty(&self) -> bool {
|
pub(super) fn is_empty(&self) -> bool {
|
||||||
self.files.read().is_empty()
|
self.blocks.is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a front-to-back iterator.
|
/// Removes all files from the cache that has notifications with a tip block less than or equal
|
||||||
pub(super) fn iter(&self) -> impl Iterator<Item = (u64, CachedBlock)> + '_ {
|
/// to the given block number.
|
||||||
self.files
|
///
|
||||||
.read()
|
/// # Returns
|
||||||
.iter()
|
///
|
||||||
.flat_map(|(k, v)| v.iter().map(move |b| (*k, *b)))
|
/// A set of file IDs that were removed.
|
||||||
.collect::<Vec<_>>()
|
pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet<u64> {
|
||||||
.into_iter()
|
let mut file_ids = HashSet::default();
|
||||||
}
|
|
||||||
|
|
||||||
/// Provides a reference to the first block from the cache, or `None` if the cache is
|
while let Some(block @ Reverse((max_block, file_id))) = self.blocks.peek().copied() {
|
||||||
/// empty.
|
if max_block <= block_number {
|
||||||
pub(super) fn front(&self) -> Option<(u64, CachedBlock)> {
|
let popped_block = self.blocks.pop().unwrap();
|
||||||
self.files.read().first_key_value().and_then(|(k, v)| v.front().map(|b| (*k, *b)))
|
debug_assert_eq!(popped_block, block);
|
||||||
}
|
file_ids.insert(file_id);
|
||||||
|
} else {
|
||||||
/// Provides a reference to the last block from the cache, or `None` if the cache is
|
break
|
||||||
/// empty.
|
}
|
||||||
pub(super) fn back(&self) -> Option<(u64, CachedBlock)> {
|
|
||||||
self.files.read().last_key_value().and_then(|(k, v)| v.back().map(|b| (*k, *b)))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Removes the notification with the given file ID.
|
|
||||||
pub(super) fn remove_notification(&self, key: u64) -> Option<VecDeque<CachedBlock>> {
|
|
||||||
self.files.write().remove(&key)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Pops the first block from the cache. If it resulted in the whole file entry being empty,
|
|
||||||
/// it will also remove the file entry.
|
|
||||||
pub(super) fn pop_front(&self) -> Option<(u64, CachedBlock)> {
|
|
||||||
let mut files = self.files.write();
|
|
||||||
|
|
||||||
let first_entry = files.first_entry()?;
|
|
||||||
let key = *first_entry.key();
|
|
||||||
let blocks = first_entry.into_mut();
|
|
||||||
let first_block = blocks.pop_front().unwrap();
|
|
||||||
if blocks.is_empty() {
|
|
||||||
files.remove(&key);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Some((key, first_block))
|
self.committed_blocks.retain(|_, (file_id, _)| !file_ids.contains(file_id));
|
||||||
}
|
|
||||||
|
|
||||||
/// Pops the last block from the cache. If it resulted in the whole file entry being empty,
|
file_ids
|
||||||
/// it will also remove the file entry.
|
|
||||||
pub(super) fn pop_back(&self) -> Option<(u64, CachedBlock)> {
|
|
||||||
let mut files = self.files.write();
|
|
||||||
|
|
||||||
let last_entry = files.last_entry()?;
|
|
||||||
let key = *last_entry.key();
|
|
||||||
let blocks = last_entry.into_mut();
|
|
||||||
let last_block = blocks.pop_back().unwrap();
|
|
||||||
if blocks.is_empty() {
|
|
||||||
files.remove(&key);
|
|
||||||
}
|
|
||||||
|
|
||||||
Some((key, last_block))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the file ID for the notification containing the given committed block hash, if it
|
/// Returns the file ID for the notification containing the given committed block hash, if it
|
||||||
@ -102,59 +59,52 @@ impl BlockCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Inserts the blocks from the notification into the cache with the given file ID.
|
/// Inserts the blocks from the notification into the cache with the given file ID.
|
||||||
///
|
|
||||||
/// First, inserts the reverted blocks (if any), then the committed blocks (if any).
|
|
||||||
pub(super) fn insert_notification_blocks_with_file_id(
|
pub(super) fn insert_notification_blocks_with_file_id(
|
||||||
&self,
|
&mut self,
|
||||||
file_id: u64,
|
file_id: u64,
|
||||||
notification: &ExExNotification,
|
notification: &ExExNotification,
|
||||||
) {
|
) {
|
||||||
let mut files = self.files.write();
|
|
||||||
|
|
||||||
let reverted_chain = notification.reverted_chain();
|
let reverted_chain = notification.reverted_chain();
|
||||||
let committed_chain = notification.committed_chain();
|
let committed_chain = notification.committed_chain();
|
||||||
|
|
||||||
if let Some(reverted_chain) = reverted_chain {
|
let max_block =
|
||||||
for block in reverted_chain.blocks().values() {
|
reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number).max();
|
||||||
files.entry(file_id).or_default().push_back(CachedBlock {
|
if let Some(max_block) = max_block {
|
||||||
action: CachedBlockAction::Revert,
|
self.blocks.push(Reverse((max_block, file_id)));
|
||||||
block: (block.number, block.hash()).into(),
|
|
||||||
parent_hash: block.parent_hash,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(committed_chain) = committed_chain {
|
if let Some(committed_chain) = &committed_chain {
|
||||||
for block in committed_chain.blocks().values() {
|
for block in committed_chain.blocks().values() {
|
||||||
let cached_block = CachedBlock {
|
let cached_block = CachedBlock {
|
||||||
action: CachedBlockAction::Commit,
|
|
||||||
block: (block.number, block.hash()).into(),
|
block: (block.number, block.hash()).into(),
|
||||||
parent_hash: block.parent_hash,
|
parent_hash: block.parent_hash,
|
||||||
};
|
};
|
||||||
files.entry(file_id).or_default().push_back(cached_block);
|
|
||||||
self.committed_blocks.insert(block.hash(), (file_id, cached_block));
|
self.committed_blocks.insert(block.hash(), (file_id, cached_block));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u64)> {
|
||||||
|
self.blocks.clone().into_sorted_vec().into_iter().map(|entry| entry.0).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(super) fn committed_blocks_sorted(&self) -> Vec<(B256, u64, CachedBlock)> {
|
||||||
|
use itertools::Itertools;
|
||||||
|
|
||||||
|
self.committed_blocks
|
||||||
|
.iter()
|
||||||
|
.map(|(hash, (file_id, block))| (*hash, *file_id, *block))
|
||||||
|
.sorted_by_key(|(_, _, block)| (block.block.number, block.block.hash))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub(super) struct CachedBlock {
|
pub(super) struct CachedBlock {
|
||||||
pub(super) action: CachedBlockAction,
|
|
||||||
/// The block number and hash of the block.
|
/// The block number and hash of the block.
|
||||||
pub(super) block: BlockNumHash,
|
pub(super) block: BlockNumHash,
|
||||||
/// The hash of the parent block.
|
/// The hash of the parent block.
|
||||||
pub(super) parent_hash: B256,
|
pub(super) parent_hash: B256,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
||||||
pub(super) enum CachedBlockAction {
|
|
||||||
Commit,
|
|
||||||
Revert,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CachedBlockAction {
|
|
||||||
pub(super) const fn is_commit(&self) -> bool {
|
|
||||||
matches!(self, Self::Commit)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@ -3,10 +3,16 @@
|
|||||||
mod cache;
|
mod cache;
|
||||||
pub use cache::BlockCache;
|
pub use cache::BlockCache;
|
||||||
mod storage;
|
mod storage;
|
||||||
use eyre::OptionExt;
|
use parking_lot::{RwLock, RwLockReadGuard};
|
||||||
pub use storage::Storage;
|
pub use storage::Storage;
|
||||||
|
|
||||||
use std::{path::Path, sync::Arc};
|
use std::{
|
||||||
|
path::Path,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
use alloy_eips::BlockNumHash;
|
use alloy_eips::BlockNumHash;
|
||||||
use alloy_primitives::B256;
|
use alloy_primitives::B256;
|
||||||
@ -40,16 +46,14 @@ impl Wal {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Commits the notification to WAL.
|
/// Commits the notification to WAL.
|
||||||
pub fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> {
|
pub fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> {
|
||||||
self.inner.commit(notification)
|
self.inner.commit(notification)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Finalizes the WAL to the given block, inclusive.
|
/// Finalizes the WAL up to the given canonical block, inclusive.
|
||||||
///
|
///
|
||||||
/// 1. Finds a notification with first unfinalized block (first notification containing a
|
/// The caller should check that all ExExes are on the canonical chain and will not need any
|
||||||
/// committed block higher than `to_block`).
|
/// blocks from the WAL below the provided block, inclusive.
|
||||||
/// 2. Removes the notifications from the beginning of WAL until the found notification. If this
|
|
||||||
/// notification includes both finalized and non-finalized blocks, it will not be removed.
|
|
||||||
pub fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
|
pub fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
|
||||||
self.inner.finalize(to_block)
|
self.inner.finalize(to_block)
|
||||||
}
|
}
|
||||||
@ -65,24 +69,35 @@ impl Wal {
|
|||||||
/// Inner type for the WAL.
|
/// Inner type for the WAL.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct WalInner {
|
struct WalInner {
|
||||||
|
next_file_id: AtomicUsize,
|
||||||
/// The underlying WAL storage backed by a file.
|
/// The underlying WAL storage backed by a file.
|
||||||
storage: Storage,
|
storage: Storage,
|
||||||
/// WAL block cache. See [`cache::BlockCache`] docs for more details.
|
/// WAL block cache. See [`cache::BlockCache`] docs for more details.
|
||||||
block_cache: BlockCache,
|
block_cache: RwLock<BlockCache>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WalInner {
|
impl WalInner {
|
||||||
fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
|
fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
|
||||||
let mut wal = Self { storage: Storage::new(directory)?, block_cache: BlockCache::new() };
|
let mut wal = Self {
|
||||||
|
next_file_id: AtomicUsize::new(0),
|
||||||
|
storage: Storage::new(directory)?,
|
||||||
|
block_cache: RwLock::new(BlockCache::default()),
|
||||||
|
};
|
||||||
wal.fill_block_cache()?;
|
wal.fill_block_cache()?;
|
||||||
Ok(wal)
|
Ok(wal)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn block_cache(&self) -> RwLockReadGuard<'_, BlockCache> {
|
||||||
|
self.block_cache.read()
|
||||||
|
}
|
||||||
|
|
||||||
/// Fills the block cache with the notifications from the storage.
|
/// Fills the block cache with the notifications from the storage.
|
||||||
#[instrument(target = "exex::wal", skip(self))]
|
#[instrument(target = "exex::wal", skip(self))]
|
||||||
fn fill_block_cache(&mut self) -> eyre::Result<()> {
|
fn fill_block_cache(&mut self) -> eyre::Result<()> {
|
||||||
let Some(files_range) = self.storage.files_range()? else { return Ok(()) };
|
let Some(files_range) = self.storage.files_range()? else { return Ok(()) };
|
||||||
|
|
||||||
|
let mut block_cache = self.block_cache.write();
|
||||||
|
|
||||||
for entry in self.storage.iter_notifications(files_range) {
|
for entry in self.storage.iter_notifications(files_range) {
|
||||||
let (file_id, notification) = entry?;
|
let (file_id, notification) = entry?;
|
||||||
|
|
||||||
@ -97,7 +112,9 @@ impl WalInner {
|
|||||||
"Inserting block cache entries"
|
"Inserting block cache entries"
|
||||||
);
|
);
|
||||||
|
|
||||||
self.block_cache.insert_notification_blocks_with_file_id(file_id, ¬ification);
|
block_cache.insert_notification_blocks_with_file_id(file_id, ¬ification);
|
||||||
|
|
||||||
|
self.next_file_id.fetch_max(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -108,83 +125,30 @@ impl WalInner {
|
|||||||
committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
|
committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
|
||||||
))]
|
))]
|
||||||
fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> {
|
fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> {
|
||||||
let file_id = self.block_cache.back().map_or(0, |block| block.0 + 1);
|
let mut block_cache = self.block_cache.write();
|
||||||
|
|
||||||
|
let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed) as u64;
|
||||||
self.storage.write_notification(file_id, notification)?;
|
self.storage.write_notification(file_id, notification)?;
|
||||||
|
|
||||||
debug!(?file_id, "Inserting notification blocks into the block cache");
|
debug!(?file_id, "Inserting notification blocks into the block cache");
|
||||||
self.block_cache.insert_notification_blocks_with_file_id(file_id, notification);
|
block_cache.insert_notification_blocks_with_file_id(file_id, notification);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(target = "exex::wal", skip(self))]
|
#[instrument(target = "exex::wal", skip(self))]
|
||||||
fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
|
fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
|
||||||
// First, walk cache to find the file ID of the notification with the finalized block and
|
let file_ids = self.block_cache.write().remove_before(to_block.number);
|
||||||
// save the file ID with the first unfinalized block. Do not remove any notifications
|
|
||||||
// yet.
|
|
||||||
let mut unfinalized_from_file_id = None;
|
|
||||||
{
|
|
||||||
let mut block_cache = self.block_cache.iter().peekable();
|
|
||||||
while let Some((file_id, block)) = block_cache.next() {
|
|
||||||
debug!(?file_id, ?block, "Iterating over the block cache");
|
|
||||||
if block.action.is_commit() &&
|
|
||||||
block.block.number == to_block.number &&
|
|
||||||
block.block.hash == to_block.hash
|
|
||||||
{
|
|
||||||
let notification = self
|
|
||||||
.storage
|
|
||||||
.read_notification(file_id)?
|
|
||||||
.ok_or_eyre("notification not found")?;
|
|
||||||
if notification.committed_chain().unwrap().blocks().len() == 1 {
|
|
||||||
unfinalized_from_file_id = Some(
|
|
||||||
block_cache.peek().map(|(file_id, _)| *file_id).unwrap_or(u64::MAX),
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
unfinalized_from_file_id = Some(file_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
?file_id,
|
|
||||||
?block,
|
|
||||||
?unfinalized_from_file_id,
|
|
||||||
"Found the finalized block in the block cache"
|
|
||||||
);
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
unfinalized_from_file_id = Some(file_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the finalized block is still not found, we can't do anything and just return.
|
|
||||||
let Some(remove_to_file_id) = unfinalized_from_file_id else {
|
|
||||||
debug!("Could not find the finalized block in WAL");
|
|
||||||
return Ok(())
|
|
||||||
};
|
|
||||||
|
|
||||||
// Remove notifications from the storage from the beginning up to the unfinalized block, not
|
|
||||||
// inclusive.
|
|
||||||
let (mut file_range_start, mut file_range_end) = (None, None);
|
|
||||||
while let Some((file_id, _)) = self.block_cache.front() {
|
|
||||||
if file_id == remove_to_file_id {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
self.block_cache.pop_front();
|
|
||||||
|
|
||||||
file_range_start.get_or_insert(file_id);
|
|
||||||
file_range_end = Some(file_id);
|
|
||||||
}
|
|
||||||
debug!(?remove_to_file_id, "Block cache was finalized");
|
|
||||||
|
|
||||||
// Remove notifications from the storage.
|
// Remove notifications from the storage.
|
||||||
if let Some((file_range_start, file_range_end)) = file_range_start.zip(file_range_end) {
|
if file_ids.is_empty() {
|
||||||
let removed_notifications =
|
|
||||||
self.storage.remove_notifications(file_range_start..=file_range_end)?;
|
|
||||||
debug!(?removed_notifications, "Storage was finalized");
|
|
||||||
} else {
|
|
||||||
debug!("No notifications were finalized from the storage");
|
debug!("No notifications were finalized from the storage");
|
||||||
|
return Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let removed_notifications = self.storage.remove_notifications(file_ids)?;
|
||||||
|
debug!(?removed_notifications, "Storage was finalized");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -212,7 +176,7 @@ impl WalHandle {
|
|||||||
&self,
|
&self,
|
||||||
block_hash: &B256,
|
block_hash: &B256,
|
||||||
) -> eyre::Result<Option<ExExNotification>> {
|
) -> eyre::Result<Option<ExExNotification>> {
|
||||||
let Some(file_id) = self.wal.block_cache.get_file_id_by_committed_block_hash(block_hash)
|
let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash)
|
||||||
else {
|
else {
|
||||||
return Ok(None)
|
return Ok(None)
|
||||||
};
|
};
|
||||||
@ -225,17 +189,16 @@ impl WalHandle {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use alloy_primitives::B256;
|
||||||
use eyre::OptionExt;
|
use eyre::OptionExt;
|
||||||
|
use itertools::Itertools;
|
||||||
use reth_exex_types::ExExNotification;
|
use reth_exex_types::ExExNotification;
|
||||||
use reth_provider::Chain;
|
use reth_provider::Chain;
|
||||||
use reth_testing_utils::generators::{
|
use reth_testing_utils::generators::{
|
||||||
self, random_block, random_block_range, BlockParams, BlockRangeParams,
|
self, random_block, random_block_range, BlockParams, BlockRangeParams,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::wal::{
|
use crate::wal::{cache::CachedBlock, Wal};
|
||||||
cache::{CachedBlock, CachedBlockAction},
|
|
||||||
Wal,
|
|
||||||
};
|
|
||||||
|
|
||||||
fn read_notifications(wal: &Wal) -> eyre::Result<Vec<ExExNotification>> {
|
fn read_notifications(wal: &Wal) -> eyre::Result<Vec<ExExNotification>> {
|
||||||
let Some(files_range) = wal.inner.storage.files_range()? else { return Ok(Vec::new()) };
|
let Some(files_range) = wal.inner.storage.files_range()? else { return Ok(Vec::new()) };
|
||||||
@ -247,6 +210,15 @@ mod tests {
|
|||||||
.collect::<eyre::Result<_>>()
|
.collect::<eyre::Result<_>>()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn sort_committed_blocks(
|
||||||
|
committed_blocks: Vec<(B256, u64, CachedBlock)>,
|
||||||
|
) -> Vec<(B256, u64, CachedBlock)> {
|
||||||
|
committed_blocks
|
||||||
|
.into_iter()
|
||||||
|
.sorted_by_key(|(_, _, block)| (block.block.number, block.block.hash))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_wal() -> eyre::Result<()> {
|
fn test_wal() -> eyre::Result<()> {
|
||||||
reth_tracing::init_test_tracing();
|
reth_tracing::init_test_tracing();
|
||||||
@ -255,8 +227,8 @@ mod tests {
|
|||||||
|
|
||||||
// Create an instance of the WAL in a temporary directory
|
// Create an instance of the WAL in a temporary directory
|
||||||
let temp_dir = tempfile::tempdir()?;
|
let temp_dir = tempfile::tempdir()?;
|
||||||
let mut wal = Wal::new(&temp_dir)?;
|
let wal = Wal::new(&temp_dir)?;
|
||||||
assert!(wal.inner.block_cache.is_empty());
|
assert!(wal.inner.block_cache().is_empty());
|
||||||
|
|
||||||
// Create 4 canonical blocks and one reorged block with number 2
|
// Create 4 canonical blocks and one reorged block with number 2
|
||||||
let blocks = random_block_range(&mut rng, 0..=3, BlockRangeParams::default())
|
let blocks = random_block_range(&mut rng, 0..=3, BlockRangeParams::default())
|
||||||
@ -315,19 +287,20 @@ mod tests {
|
|||||||
|
|
||||||
// First notification (commit block 0, 1)
|
// First notification (commit block 0, 1)
|
||||||
let file_id = 0;
|
let file_id = 0;
|
||||||
let committed_notification_1_cache = vec![
|
let committed_notification_1_cache_blocks = (blocks[1].number, file_id);
|
||||||
|
let committed_notification_1_cache_committed_blocks = vec![
|
||||||
(
|
(
|
||||||
|
blocks[0].hash(),
|
||||||
file_id,
|
file_id,
|
||||||
CachedBlock {
|
CachedBlock {
|
||||||
action: CachedBlockAction::Commit,
|
|
||||||
block: (blocks[0].number, blocks[0].hash()).into(),
|
block: (blocks[0].number, blocks[0].hash()).into(),
|
||||||
parent_hash: blocks[0].parent_hash,
|
parent_hash: blocks[0].parent_hash,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
|
blocks[1].hash(),
|
||||||
file_id,
|
file_id,
|
||||||
CachedBlock {
|
CachedBlock {
|
||||||
action: CachedBlockAction::Commit,
|
|
||||||
block: (blocks[1].number, blocks[1].hash()).into(),
|
block: (blocks[1].number, blocks[1].hash()).into(),
|
||||||
parent_hash: blocks[1].parent_hash,
|
parent_hash: blocks[1].parent_hash,
|
||||||
},
|
},
|
||||||
@ -335,25 +308,26 @@ mod tests {
|
|||||||
];
|
];
|
||||||
wal.commit(&committed_notification_1)?;
|
wal.commit(&committed_notification_1)?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
wal.inner.block_cache.iter().collect::<Vec<_>>(),
|
wal.inner.block_cache().blocks_sorted(),
|
||||||
committed_notification_1_cache
|
[committed_notification_1_cache_blocks]
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
wal.inner.block_cache().committed_blocks_sorted(),
|
||||||
|
committed_notification_1_cache_committed_blocks
|
||||||
);
|
);
|
||||||
assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
|
assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
|
||||||
|
|
||||||
// Second notification (revert block 1)
|
// Second notification (revert block 1)
|
||||||
wal.commit(&reverted_notification)?;
|
wal.commit(&reverted_notification)?;
|
||||||
let file_id = 1;
|
let file_id = 1;
|
||||||
let reverted_notification_cache = vec![(
|
let reverted_notification_cache_blocks = (blocks[1].number, file_id);
|
||||||
file_id,
|
|
||||||
CachedBlock {
|
|
||||||
action: CachedBlockAction::Revert,
|
|
||||||
block: (blocks[1].number, blocks[1].hash()).into(),
|
|
||||||
parent_hash: blocks[1].parent_hash,
|
|
||||||
},
|
|
||||||
)];
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
wal.inner.block_cache.iter().collect::<Vec<_>>(),
|
wal.inner.block_cache().blocks_sorted(),
|
||||||
[committed_notification_1_cache.clone(), reverted_notification_cache.clone()].concat()
|
[reverted_notification_cache_blocks, committed_notification_1_cache_blocks]
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
wal.inner.block_cache().committed_blocks_sorted(),
|
||||||
|
committed_notification_1_cache_committed_blocks
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
read_notifications(&wal)?,
|
read_notifications(&wal)?,
|
||||||
@ -363,32 +337,42 @@ mod tests {
|
|||||||
// Third notification (commit block 1, 2)
|
// Third notification (commit block 1, 2)
|
||||||
wal.commit(&committed_notification_2)?;
|
wal.commit(&committed_notification_2)?;
|
||||||
let file_id = 2;
|
let file_id = 2;
|
||||||
let committed_notification_2_cache = vec![
|
let committed_notification_2_cache_blocks = (blocks[2].number, file_id);
|
||||||
|
let committed_notification_2_cache_committed_blocks = vec![
|
||||||
(
|
(
|
||||||
|
block_1_reorged.hash(),
|
||||||
file_id,
|
file_id,
|
||||||
CachedBlock {
|
CachedBlock {
|
||||||
action: CachedBlockAction::Commit,
|
|
||||||
block: (block_1_reorged.number, block_1_reorged.hash()).into(),
|
block: (block_1_reorged.number, block_1_reorged.hash()).into(),
|
||||||
parent_hash: block_1_reorged.parent_hash,
|
parent_hash: block_1_reorged.parent_hash,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
|
blocks[2].hash(),
|
||||||
file_id,
|
file_id,
|
||||||
CachedBlock {
|
CachedBlock {
|
||||||
action: CachedBlockAction::Commit,
|
|
||||||
block: (blocks[2].number, blocks[2].hash()).into(),
|
block: (blocks[2].number, blocks[2].hash()).into(),
|
||||||
parent_hash: blocks[2].parent_hash,
|
parent_hash: blocks[2].parent_hash,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
];
|
];
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
wal.inner.block_cache.iter().collect::<Vec<_>>(),
|
wal.inner.block_cache().blocks_sorted(),
|
||||||
[
|
[
|
||||||
committed_notification_1_cache.clone(),
|
committed_notification_2_cache_blocks,
|
||||||
reverted_notification_cache.clone(),
|
reverted_notification_cache_blocks,
|
||||||
committed_notification_2_cache.clone()
|
committed_notification_1_cache_blocks,
|
||||||
]
|
]
|
||||||
.concat()
|
);
|
||||||
|
assert_eq!(
|
||||||
|
wal.inner.block_cache().committed_blocks_sorted(),
|
||||||
|
sort_committed_blocks(
|
||||||
|
[
|
||||||
|
committed_notification_1_cache_committed_blocks.clone(),
|
||||||
|
committed_notification_2_cache_committed_blocks.clone()
|
||||||
|
]
|
||||||
|
.concat()
|
||||||
|
)
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
read_notifications(&wal)?,
|
read_notifications(&wal)?,
|
||||||
@ -402,47 +386,50 @@ mod tests {
|
|||||||
// Fourth notification (revert block 2, commit block 2, 3)
|
// Fourth notification (revert block 2, commit block 2, 3)
|
||||||
wal.commit(&reorged_notification)?;
|
wal.commit(&reorged_notification)?;
|
||||||
let file_id = 3;
|
let file_id = 3;
|
||||||
let reorged_notification_cache = vec![
|
let reorged_notification_cache_blocks = (blocks[3].number, file_id);
|
||||||
|
let reorged_notification_cache_committed_blocks = vec![
|
||||||
(
|
(
|
||||||
|
block_2_reorged.hash(),
|
||||||
file_id,
|
file_id,
|
||||||
CachedBlock {
|
CachedBlock {
|
||||||
action: CachedBlockAction::Revert,
|
|
||||||
block: (blocks[2].number, blocks[2].hash()).into(),
|
|
||||||
parent_hash: blocks[2].parent_hash,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
(
|
|
||||||
file_id,
|
|
||||||
CachedBlock {
|
|
||||||
action: CachedBlockAction::Commit,
|
|
||||||
block: (block_2_reorged.number, block_2_reorged.hash()).into(),
|
block: (block_2_reorged.number, block_2_reorged.hash()).into(),
|
||||||
parent_hash: block_2_reorged.parent_hash,
|
parent_hash: block_2_reorged.parent_hash,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
|
blocks[3].hash(),
|
||||||
file_id,
|
file_id,
|
||||||
CachedBlock {
|
CachedBlock {
|
||||||
action: CachedBlockAction::Commit,
|
|
||||||
block: (blocks[3].number, blocks[3].hash()).into(),
|
block: (blocks[3].number, blocks[3].hash()).into(),
|
||||||
parent_hash: blocks[3].parent_hash,
|
parent_hash: blocks[3].parent_hash,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
];
|
];
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
wal.inner.block_cache.iter().collect::<Vec<_>>(),
|
wal.inner.block_cache().blocks_sorted(),
|
||||||
[
|
[
|
||||||
committed_notification_1_cache,
|
reorged_notification_cache_blocks,
|
||||||
reverted_notification_cache,
|
committed_notification_2_cache_blocks,
|
||||||
committed_notification_2_cache.clone(),
|
reverted_notification_cache_blocks,
|
||||||
reorged_notification_cache.clone()
|
committed_notification_1_cache_blocks,
|
||||||
]
|
]
|
||||||
.concat()
|
);
|
||||||
|
assert_eq!(
|
||||||
|
wal.inner.block_cache().committed_blocks_sorted(),
|
||||||
|
sort_committed_blocks(
|
||||||
|
[
|
||||||
|
committed_notification_1_cache_committed_blocks,
|
||||||
|
committed_notification_2_cache_committed_blocks.clone(),
|
||||||
|
reorged_notification_cache_committed_blocks.clone()
|
||||||
|
]
|
||||||
|
.concat()
|
||||||
|
)
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
read_notifications(&wal)?,
|
read_notifications(&wal)?,
|
||||||
vec![
|
vec![
|
||||||
committed_notification_1.clone(),
|
committed_notification_1,
|
||||||
reverted_notification.clone(),
|
reverted_notification,
|
||||||
committed_notification_2.clone(),
|
committed_notification_2.clone(),
|
||||||
reorged_notification.clone()
|
reorged_notification.clone()
|
||||||
]
|
]
|
||||||
@ -454,8 +441,18 @@ mod tests {
|
|||||||
// the notifications before it.
|
// the notifications before it.
|
||||||
wal.finalize((block_1_reorged.number, block_1_reorged.hash()).into())?;
|
wal.finalize((block_1_reorged.number, block_1_reorged.hash()).into())?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
wal.inner.block_cache.iter().collect::<Vec<_>>(),
|
wal.inner.block_cache().blocks_sorted(),
|
||||||
[committed_notification_2_cache, reorged_notification_cache].concat()
|
[reorged_notification_cache_blocks, committed_notification_2_cache_blocks]
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
wal.inner.block_cache().committed_blocks_sorted(),
|
||||||
|
sort_committed_blocks(
|
||||||
|
[
|
||||||
|
committed_notification_2_cache_committed_blocks,
|
||||||
|
reorged_notification_cache_committed_blocks
|
||||||
|
]
|
||||||
|
.concat()
|
||||||
|
)
|
||||||
);
|
);
|
||||||
assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]);
|
assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]);
|
||||||
|
|
||||||
|
|||||||
@ -41,10 +41,16 @@ impl Storage {
|
|||||||
|
|
||||||
/// Removes notification for the given file ID from the storage.
|
/// Removes notification for the given file ID from the storage.
|
||||||
#[instrument(target = "exex::wal::storage", skip(self))]
|
#[instrument(target = "exex::wal::storage", skip(self))]
|
||||||
fn remove_notification(&self, file_id: u64) {
|
fn remove_notification(&self, file_id: u64) -> bool {
|
||||||
match reth_fs_util::remove_file(self.file_path(file_id)) {
|
match reth_fs_util::remove_file(self.file_path(file_id)) {
|
||||||
Ok(()) => debug!("Notification was removed from the storage"),
|
Ok(()) => {
|
||||||
Err(err) => debug!(?err, "Failed to remove notification from the storage"),
|
debug!("Notification was removed from the storage");
|
||||||
|
true
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
debug!(?err, "Failed to remove notification from the storage");
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,17 +73,24 @@ impl Storage {
|
|||||||
Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id))
|
Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes notifications from the storage according to the given range.
|
/// Removes notifications from the storage according to the given list of file IDs.
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
///
|
///
|
||||||
/// Number of removed notifications.
|
/// Number of removed notifications.
|
||||||
pub(super) fn remove_notifications(&self, range: RangeInclusive<u64>) -> eyre::Result<usize> {
|
pub(super) fn remove_notifications(
|
||||||
for id in range.clone() {
|
&self,
|
||||||
self.remove_notification(id);
|
file_ids: impl IntoIterator<Item = u64>,
|
||||||
|
) -> eyre::Result<usize> {
|
||||||
|
let mut deleted = 0;
|
||||||
|
|
||||||
|
for id in file_ids {
|
||||||
|
if self.remove_notification(id) {
|
||||||
|
deleted += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(range.count())
|
Ok(deleted)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn iter_notifications(
|
pub(super) fn iter_notifications(
|
||||||
@ -91,7 +104,7 @@ impl Storage {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reads the notification from the file with the given id.
|
/// Reads the notification from the file with the given ID.
|
||||||
#[instrument(target = "exex::wal::storage", skip(self))]
|
#[instrument(target = "exex::wal::storage", skip(self))]
|
||||||
pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result<Option<ExExNotification>> {
|
pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result<Option<ExExNotification>> {
|
||||||
let file_path = self.file_path(file_id);
|
let file_path = self.file_path(file_id);
|
||||||
@ -107,7 +120,7 @@ impl Storage {
|
|||||||
Ok(serde_json::from_reader(&mut file)?)
|
Ok(serde_json::from_reader(&mut file)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Writes the notification to the file with the given id.
|
/// Writes the notification to the file with the given ID.
|
||||||
#[instrument(target = "exex::wal::storage", skip(self, notification))]
|
#[instrument(target = "exex::wal::storage", skip(self, notification))]
|
||||||
pub(super) fn write_notification(
|
pub(super) fn write_notification(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@ -10,7 +10,6 @@ license.workspace = true
|
|||||||
reth.workspace = true
|
reth.workspace = true
|
||||||
reth-chainspec.workspace = true
|
reth-chainspec.workspace = true
|
||||||
reth-node-core.workspace = true
|
reth-node-core.workspace = true
|
||||||
reth-primitives.workspace = true
|
|
||||||
reth-node-ethereum = { workspace = true, features = ["test-utils"] }
|
reth-node-ethereum = { workspace = true, features = ["test-utils"] }
|
||||||
futures-util.workspace = true
|
futures-util.workspace = true
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user