mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(exex): write ahead log (#10995)
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -7375,6 +7375,7 @@ dependencies = [
|
||||
"reth-evm",
|
||||
"reth-evm-ethereum",
|
||||
"reth-exex-types",
|
||||
"reth-fs-util",
|
||||
"reth-metrics",
|
||||
"reth-node-api",
|
||||
"reth-node-core",
|
||||
@ -7389,8 +7390,11 @@ dependencies = [
|
||||
"reth-testing-utils",
|
||||
"reth-tracing",
|
||||
"secp256k1",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@ -1,3 +1,3 @@
|
||||
msrv = "1.81"
|
||||
too-large-for-stack = 128
|
||||
doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]
|
||||
doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "WAL", "MessagePack"]
|
||||
|
||||
@ -16,7 +16,8 @@ workspace = true
|
||||
reth-chainspec.workspace = true
|
||||
reth-config.workspace = true
|
||||
reth-evm.workspace = true
|
||||
reth-exex-types.workspace = true
|
||||
reth-exex-types = { workspace = true, features = ["serde"] }
|
||||
reth-fs-util.workspace = true
|
||||
reth-metrics.workspace = true
|
||||
reth-node-api.workspace = true
|
||||
reth-node-core.workspace = true
|
||||
@ -41,6 +42,8 @@ tokio.workspace = true
|
||||
## misc
|
||||
eyre.workspace = true
|
||||
metrics.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
reth-blockchain-tree.workspace = true
|
||||
@ -53,6 +56,7 @@ reth-provider = { workspace = true, features = ["test-utils"] }
|
||||
reth-testing-utils.workspace = true
|
||||
|
||||
secp256k1.workspace = true
|
||||
tempfile.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
|
||||
@ -46,6 +46,8 @@ pub use event::*;
|
||||
mod manager;
|
||||
pub use manager::*;
|
||||
|
||||
mod wal;
|
||||
|
||||
// Re-export exex types
|
||||
#[doc(inline)]
|
||||
pub use reth_exex_types::*;
|
||||
|
||||
137
crates/exex/exex/src/wal/cache.rs
Normal file
137
crates/exex/exex/src/wal/cache.rs
Normal file
@ -0,0 +1,137 @@
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
|
||||
use reth_exex_types::ExExNotification;
|
||||
use reth_primitives::BlockNumHash;
|
||||
|
||||
/// The block cache of the WAL. Acts as a mapping of `File ID -> List of Blocks`.
|
||||
///
|
||||
/// For each notification written to the WAL, there will be an entry per block written to
|
||||
/// the cache with the same file ID. I.e. for each notification, there may be multiple blocks in the
|
||||
/// cache.
|
||||
///
|
||||
/// This cache is needed to avoid walking the WAL directory every time we want to find a
|
||||
/// notification corresponding to a block.
|
||||
#[derive(Debug)]
|
||||
pub(super) struct BlockCache(BTreeMap<u64, VecDeque<CachedBlock>>);
|
||||
|
||||
impl BlockCache {
|
||||
/// Creates a new instance of [`BlockCache`].
|
||||
pub(super) const fn new() -> Self {
|
||||
Self(BTreeMap::new())
|
||||
}
|
||||
|
||||
/// Returns `true` if the cache is empty.
|
||||
pub(super) fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
|
||||
/// Returns a front-to-back iterator.
|
||||
pub(super) fn iter(&self) -> impl Iterator<Item = (u64, CachedBlock)> + '_ {
|
||||
self.0.iter().flat_map(|(k, v)| v.iter().map(move |b| (*k, *b)))
|
||||
}
|
||||
|
||||
/// Provides a reference to the first block from the cache, or `None` if the cache is
|
||||
/// empty.
|
||||
pub(super) fn front(&self) -> Option<(u64, CachedBlock)> {
|
||||
self.0.first_key_value().and_then(|(k, v)| v.front().map(|b| (*k, *b)))
|
||||
}
|
||||
|
||||
/// Provides a reference to the last block from the cache, or `None` if the cache is
|
||||
/// empty.
|
||||
pub(super) fn back(&self) -> Option<(u64, CachedBlock)> {
|
||||
self.0.last_key_value().and_then(|(k, v)| v.back().map(|b| (*k, *b)))
|
||||
}
|
||||
|
||||
/// Removes the notification with the given file ID.
|
||||
pub(super) fn remove_notification(&mut self, key: u64) -> Option<VecDeque<CachedBlock>> {
|
||||
self.0.remove(&key)
|
||||
}
|
||||
|
||||
/// Pops the first block from the cache. If it resulted in the whole file entry being empty,
|
||||
/// it will also remove the file entry.
|
||||
pub(super) fn pop_front(&mut self) -> Option<(u64, CachedBlock)> {
|
||||
let first_entry = self.0.first_entry()?;
|
||||
let key = *first_entry.key();
|
||||
let blocks = first_entry.into_mut();
|
||||
let first_block = blocks.pop_front().unwrap();
|
||||
if blocks.is_empty() {
|
||||
self.0.remove(&key);
|
||||
}
|
||||
|
||||
Some((key, first_block))
|
||||
}
|
||||
|
||||
/// Pops the last block from the cache. If it resulted in the whole file entry being empty,
|
||||
/// it will also remove the file entry.
|
||||
pub(super) fn pop_back(&mut self) -> Option<(u64, CachedBlock)> {
|
||||
let last_entry = self.0.last_entry()?;
|
||||
let key = *last_entry.key();
|
||||
let blocks = last_entry.into_mut();
|
||||
let last_block = blocks.pop_back().unwrap();
|
||||
if blocks.is_empty() {
|
||||
self.0.remove(&key);
|
||||
}
|
||||
|
||||
Some((key, last_block))
|
||||
}
|
||||
|
||||
/// Appends a block to the back of the specified file entry.
|
||||
pub(super) fn insert(&mut self, file_id: u64, block: CachedBlock) {
|
||||
self.0.entry(file_id).or_default().push_back(block);
|
||||
}
|
||||
|
||||
/// Inserts the blocks from the notification into the cache with the given file ID.
|
||||
///
|
||||
/// First, inserts the reverted blocks (if any), then the committed blocks (if any).
|
||||
pub(super) fn insert_notification_blocks_with_file_id(
|
||||
&mut self,
|
||||
file_id: u64,
|
||||
notification: &ExExNotification,
|
||||
) {
|
||||
let reverted_chain = notification.reverted_chain();
|
||||
let committed_chain = notification.committed_chain();
|
||||
|
||||
if let Some(reverted_chain) = reverted_chain {
|
||||
for block in reverted_chain.blocks().values() {
|
||||
self.insert(
|
||||
file_id,
|
||||
CachedBlock {
|
||||
action: CachedBlockAction::Revert,
|
||||
block: (block.number, block.hash()).into(),
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(committed_chain) = committed_chain {
|
||||
for block in committed_chain.blocks().values() {
|
||||
self.insert(
|
||||
file_id,
|
||||
CachedBlock {
|
||||
action: CachedBlockAction::Commit,
|
||||
block: (block.number, block.hash()).into(),
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(super) struct CachedBlock {
|
||||
pub(super) action: CachedBlockAction,
|
||||
/// The block number and hash of the block.
|
||||
pub(super) block: BlockNumHash,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(super) enum CachedBlockAction {
|
||||
Commit,
|
||||
Revert,
|
||||
}
|
||||
|
||||
impl CachedBlockAction {
|
||||
pub(super) const fn is_commit(&self) -> bool {
|
||||
matches!(self, Self::Commit)
|
||||
}
|
||||
}
|
||||
492
crates/exex/exex/src/wal/mod.rs
Normal file
492
crates/exex/exex/src/wal/mod.rs
Normal file
@ -0,0 +1,492 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
mod cache;
|
||||
mod storage;
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use cache::BlockCache;
|
||||
use reth_exex_types::ExExNotification;
|
||||
use reth_primitives::BlockNumHash;
|
||||
use reth_tracing::tracing::{debug, instrument};
|
||||
use storage::Storage;
|
||||
|
||||
/// WAL is a write-ahead log (WAL) that stores the notifications sent to a particular ExEx.
|
||||
///
|
||||
/// WAL is backed by a directory of binary files represented by [`Storage`] and a block cache
|
||||
/// represented by [`BlockCache`]. The role of the block cache is to avoid walking the WAL directory
|
||||
/// and decoding notifications every time we want to rollback/finalize the WAL.
|
||||
///
|
||||
/// The expected mode of operation is as follows:
|
||||
/// 1. On every new canonical chain notification, call [`Wal::commit`].
|
||||
/// 2. When ExEx is on a wrong fork, rollback the WAL using [`Wal::rollback`]. The caller is
|
||||
/// expected to create reverts from the removed notifications and backfill the blocks between the
|
||||
/// returned block and the given rollback block. After that, commit new notifications as usual
|
||||
/// with [`Wal::commit`].
|
||||
/// 3. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the
|
||||
/// WAL.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Wal {
|
||||
/// The underlying WAL storage backed by a file.
|
||||
storage: Storage,
|
||||
/// WAL block cache. See [`cache::BlockCache`] docs for more details.
|
||||
block_cache: BlockCache,
|
||||
}
|
||||
|
||||
impl Wal {
|
||||
/// Creates a new instance of [`Wal`].
|
||||
pub(crate) fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
|
||||
let mut wal = Self { storage: Storage::new(directory)?, block_cache: BlockCache::new() };
|
||||
wal.fill_block_cache()?;
|
||||
Ok(wal)
|
||||
}
|
||||
|
||||
/// Fills the block cache with the notifications from the storage.
|
||||
#[instrument(target = "exex::wal", skip(self))]
|
||||
fn fill_block_cache(&mut self) -> eyre::Result<()> {
|
||||
let Some(files_range) = self.storage.files_range()? else { return Ok(()) };
|
||||
|
||||
for entry in self.storage.iter_notifications(files_range) {
|
||||
let (file_id, notification) = entry?;
|
||||
|
||||
let committed_chain = notification.committed_chain();
|
||||
let reverted_chain = notification.reverted_chain();
|
||||
|
||||
debug!(
|
||||
target: "exex::wal",
|
||||
?file_id,
|
||||
reverted_block_range = ?reverted_chain.as_ref().map(|chain| chain.range()),
|
||||
committed_block_range = ?committed_chain.as_ref().map(|chain| chain.range()),
|
||||
"Inserting block cache entries"
|
||||
);
|
||||
|
||||
self.block_cache.insert_notification_blocks_with_file_id(file_id, ¬ification);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Commits the notification to WAL.
|
||||
#[instrument(target = "exex::wal", skip_all, fields(
|
||||
reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
|
||||
committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
|
||||
))]
|
||||
pub(crate) fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> {
|
||||
debug!("Writing notification to WAL");
|
||||
let file_id = self.block_cache.back().map_or(0, |block| block.0 + 1);
|
||||
self.storage.write_notification(file_id, notification)?;
|
||||
|
||||
debug!(?file_id, "Inserting notification blocks into the block cache");
|
||||
self.block_cache.insert_notification_blocks_with_file_id(file_id, notification);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Rollbacks the WAL to the given block, inclusive.
|
||||
///
|
||||
/// 1. Walks the WAL from the end and searches for the first notification where committed chain
|
||||
/// contains a block with the same number and hash as `to_block`.
|
||||
/// 2. If the notification is found, truncates the WAL. It means that if the found notification
|
||||
/// contains both given block and blocks before it, the whole notification will be truncated.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// 1. The block number and hash of the lowest removed block.
|
||||
/// 2. The notifications that were removed.
|
||||
#[instrument(target = "exex::wal", skip(self))]
|
||||
pub(crate) fn rollback(
|
||||
&mut self,
|
||||
to_block: BlockNumHash,
|
||||
) -> eyre::Result<Option<(BlockNumHash, Vec<ExExNotification>)>> {
|
||||
// First, pop items from the back of the cache until we find the notification with the
|
||||
// specified block. When found, save the file ID of that notification.
|
||||
let mut remove_from_file_id = None;
|
||||
let mut remove_to_file_id = None;
|
||||
let mut lowest_removed_block = None;
|
||||
while let Some((file_id, block)) = self.block_cache.pop_back() {
|
||||
debug!(?file_id, ?block, "Popped back block from the block cache");
|
||||
if block.action.is_commit() && block.block.number == to_block.number {
|
||||
debug!(
|
||||
?file_id,
|
||||
?block,
|
||||
?remove_from_file_id,
|
||||
?lowest_removed_block,
|
||||
"Found the requested block"
|
||||
);
|
||||
|
||||
if block.block.hash != to_block.hash {
|
||||
eyre::bail!("block hash mismatch in WAL")
|
||||
}
|
||||
|
||||
remove_from_file_id = Some(file_id);
|
||||
|
||||
let notification = self.storage.read_notification(file_id)?;
|
||||
lowest_removed_block = notification
|
||||
.committed_chain()
|
||||
.as_ref()
|
||||
.map(|chain| chain.first())
|
||||
.map(|block| (block.number, block.hash()).into());
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
remove_from_file_id = Some(file_id);
|
||||
remove_to_file_id.get_or_insert(file_id);
|
||||
}
|
||||
|
||||
// If the specified block is still not found, we can't do anything and just return. The
|
||||
// cache was empty.
|
||||
let Some((remove_from_file_id, remove_to_file_id)) =
|
||||
remove_from_file_id.zip(remove_to_file_id)
|
||||
else {
|
||||
debug!("No blocks were rolled back");
|
||||
return Ok(None)
|
||||
};
|
||||
|
||||
// Remove the rest of the block cache entries for the file ID that we found.
|
||||
self.block_cache.remove_notification(remove_from_file_id);
|
||||
debug!(?remove_from_file_id, "Block cache was rolled back");
|
||||
|
||||
// Remove notifications from the storage.
|
||||
let removed_notifications =
|
||||
self.storage.take_notifications(remove_from_file_id..=remove_to_file_id)?;
|
||||
debug!(removed_notifications = ?removed_notifications.len(), "Storage was rolled back");
|
||||
|
||||
Ok(Some((lowest_removed_block.expect("qed"), removed_notifications)))
|
||||
}
|
||||
|
||||
/// Finalizes the WAL to the given block, inclusive.
|
||||
///
|
||||
/// 1. Finds a notification with first unfinalized block (first notification containing a
|
||||
/// committed block higher than `to_block`).
|
||||
/// 2. Removes the notifications from the beginning of WAL until the found notification. If this
|
||||
/// notification includes both finalized and non-finalized blocks, it will not be removed.
|
||||
#[instrument(target = "exex::wal", skip(self))]
|
||||
pub(crate) fn finalize(&mut self, to_block: BlockNumHash) -> eyre::Result<()> {
|
||||
// First, walk cache to find the file ID of the notification with the finalized block and
|
||||
// save the file ID with the last unfinalized block. Do not remove any notifications
|
||||
// yet.
|
||||
let mut unfinalized_from_file_id = None;
|
||||
{
|
||||
let mut block_cache = self.block_cache.iter().peekable();
|
||||
while let Some((file_id, block)) = block_cache.next() {
|
||||
debug!(?file_id, ?block, "Iterating over the block cache");
|
||||
if block.action.is_commit() &&
|
||||
block.block.number == to_block.number &&
|
||||
block.block.hash == to_block.hash
|
||||
{
|
||||
let notification = self.storage.read_notification(file_id)?;
|
||||
if notification.committed_chain().unwrap().blocks().len() == 1 {
|
||||
unfinalized_from_file_id = block_cache.peek().map(|(file_id, _)| *file_id);
|
||||
} else {
|
||||
unfinalized_from_file_id = Some(file_id);
|
||||
}
|
||||
|
||||
debug!(
|
||||
?file_id,
|
||||
?block,
|
||||
?unfinalized_from_file_id,
|
||||
"Found the finalized block in the block cache"
|
||||
);
|
||||
break
|
||||
}
|
||||
|
||||
unfinalized_from_file_id = Some(file_id);
|
||||
}
|
||||
}
|
||||
|
||||
// If the finalized block is still not found, we can't do anything and just return.
|
||||
let Some(remove_to_file_id) = unfinalized_from_file_id else {
|
||||
debug!("Could not find the finalized block in WAL");
|
||||
return Ok(())
|
||||
};
|
||||
|
||||
// Remove notifications from the storage from the beginning up to the unfinalized block, not
|
||||
// inclusive.
|
||||
let (mut file_range_start, mut file_range_end) = (None, None);
|
||||
while let Some((file_id, _)) = self.block_cache.front() {
|
||||
if file_id == remove_to_file_id {
|
||||
break
|
||||
}
|
||||
self.block_cache.pop_front();
|
||||
|
||||
file_range_start.get_or_insert(file_id);
|
||||
file_range_end = Some(file_id);
|
||||
}
|
||||
debug!(?remove_to_file_id, "Block cache was finalized");
|
||||
|
||||
// Remove notifications from the storage.
|
||||
if let Some((file_range_start, file_range_end)) = file_range_start.zip(file_range_end) {
|
||||
let removed_notifications =
|
||||
self.storage.remove_notifications(file_range_start..=file_range_end)?;
|
||||
debug!(?removed_notifications, "Storage was finalized");
|
||||
} else {
|
||||
debug!("No notifications were finalized from the storage");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use eyre::OptionExt;
|
||||
use reth_exex_types::ExExNotification;
|
||||
use reth_provider::Chain;
|
||||
use reth_testing_utils::generators::{
|
||||
self, random_block, random_block_range, BlockParams, BlockRangeParams,
|
||||
};
|
||||
|
||||
use crate::wal::{
|
||||
cache::{CachedBlock, CachedBlockAction},
|
||||
Wal,
|
||||
};
|
||||
|
||||
fn read_notifications(wal: &Wal) -> eyre::Result<Vec<ExExNotification>> {
|
||||
let Some(files_range) = wal.storage.files_range()? else { return Ok(Vec::new()) };
|
||||
|
||||
wal.storage
|
||||
.iter_notifications(files_range)
|
||||
.map(|entry| Ok(entry?.1))
|
||||
.collect::<eyre::Result<_>>()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wal() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let mut rng = generators::rng();
|
||||
|
||||
// Create an instance of the WAL in a temporary directory
|
||||
let temp_dir = tempfile::tempdir()?;
|
||||
let mut wal = Wal::new(&temp_dir)?;
|
||||
assert!(wal.block_cache.is_empty());
|
||||
|
||||
// Create 4 canonical blocks and one reorged block with number 2
|
||||
let blocks = random_block_range(&mut rng, 0..=3, BlockRangeParams::default())
|
||||
.into_iter()
|
||||
.map(|block| block.seal_with_senders().ok_or_eyre("failed to recover senders"))
|
||||
.collect::<eyre::Result<Vec<_>>>()?;
|
||||
let block_1_reorged = random_block(
|
||||
&mut rng,
|
||||
1,
|
||||
BlockParams { parent: Some(blocks[0].hash()), ..Default::default() },
|
||||
)
|
||||
.seal_with_senders()
|
||||
.ok_or_eyre("failed to recover senders")?;
|
||||
let block_2_reorged = random_block(
|
||||
&mut rng,
|
||||
2,
|
||||
BlockParams { parent: Some(blocks[1].hash()), ..Default::default() },
|
||||
)
|
||||
.seal_with_senders()
|
||||
.ok_or_eyre("failed to recover senders")?;
|
||||
|
||||
// Create notifications for the above blocks.
|
||||
// 1. Committed notification for blocks with number 0 and 1
|
||||
// 2. Reverted notification for block with number 1
|
||||
// 3. Committed notification for block with number 1 and 2
|
||||
// 4. Reorged notification for block with number 2 that was reverted, and blocks with number
|
||||
// 2 and 3 that were committed
|
||||
let committed_notification_1 = ExExNotification::ChainCommitted {
|
||||
new: Arc::new(Chain::new(
|
||||
vec![blocks[0].clone(), blocks[1].clone()],
|
||||
Default::default(),
|
||||
None,
|
||||
)),
|
||||
};
|
||||
let reverted_notification = ExExNotification::ChainReverted {
|
||||
old: Arc::new(Chain::new(vec![blocks[1].clone()], Default::default(), None)),
|
||||
};
|
||||
let committed_notification_2 = ExExNotification::ChainCommitted {
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block_1_reorged.clone(), blocks[2].clone()],
|
||||
Default::default(),
|
||||
None,
|
||||
)),
|
||||
};
|
||||
let reorged_notification = ExExNotification::ChainReorged {
|
||||
old: Arc::new(Chain::new(vec![blocks[2].clone()], Default::default(), None)),
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block_2_reorged.clone(), blocks[3].clone()],
|
||||
Default::default(),
|
||||
None,
|
||||
)),
|
||||
};
|
||||
|
||||
// Commit notifications, verify that the block cache is updated and the notifications are
|
||||
// written to WAL.
|
||||
|
||||
// First notification (commit block 0, 1)
|
||||
let file_id = 0;
|
||||
let committed_notification_1_cache = vec![
|
||||
(
|
||||
file_id,
|
||||
CachedBlock {
|
||||
action: CachedBlockAction::Commit,
|
||||
block: (blocks[0].number, blocks[0].hash()).into(),
|
||||
},
|
||||
),
|
||||
(
|
||||
file_id,
|
||||
CachedBlock {
|
||||
action: CachedBlockAction::Commit,
|
||||
block: (blocks[1].number, blocks[1].hash()).into(),
|
||||
},
|
||||
),
|
||||
];
|
||||
wal.commit(&committed_notification_1)?;
|
||||
assert_eq!(wal.block_cache.iter().collect::<Vec<_>>(), committed_notification_1_cache);
|
||||
assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
|
||||
|
||||
// Second notification (revert block 1)
|
||||
wal.commit(&reverted_notification)?;
|
||||
let file_id = 1;
|
||||
let reverted_notification_cache = vec![(
|
||||
file_id,
|
||||
CachedBlock {
|
||||
action: CachedBlockAction::Revert,
|
||||
block: (blocks[1].number, blocks[1].hash()).into(),
|
||||
},
|
||||
)];
|
||||
assert_eq!(
|
||||
wal.block_cache.iter().collect::<Vec<_>>(),
|
||||
[committed_notification_1_cache.clone(), reverted_notification_cache.clone()].concat()
|
||||
);
|
||||
assert_eq!(
|
||||
read_notifications(&wal)?,
|
||||
vec![committed_notification_1.clone(), reverted_notification.clone()]
|
||||
);
|
||||
|
||||
// Now, rollback to block 1 and verify that both the block cache and the storage are
|
||||
// empty. We expect the rollback to delete the first notification (commit block 0, 1),
|
||||
// because we can't delete blocks partly from the notification, and also the second
|
||||
// notification (revert block 1). Additionally, check that the block that the rolled
|
||||
// back to is the block with number 0.
|
||||
let rollback_result = wal.rollback((blocks[1].number, blocks[1].hash()).into())?;
|
||||
assert_eq!(wal.block_cache.iter().collect::<Vec<_>>(), vec![]);
|
||||
assert_eq!(read_notifications(&wal)?, vec![]);
|
||||
assert_eq!(
|
||||
rollback_result,
|
||||
Some((
|
||||
(blocks[0].number, blocks[0].hash()).into(),
|
||||
vec![committed_notification_1.clone(), reverted_notification.clone()]
|
||||
))
|
||||
);
|
||||
|
||||
// Commit notifications 1 and 2 again
|
||||
wal.commit(&committed_notification_1)?;
|
||||
assert_eq!(
|
||||
wal.block_cache.iter().collect::<Vec<_>>(),
|
||||
[committed_notification_1_cache.clone()].concat()
|
||||
);
|
||||
assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
|
||||
wal.commit(&reverted_notification)?;
|
||||
assert_eq!(
|
||||
wal.block_cache.iter().collect::<Vec<_>>(),
|
||||
[committed_notification_1_cache.clone(), reverted_notification_cache.clone()].concat()
|
||||
);
|
||||
assert_eq!(
|
||||
read_notifications(&wal)?,
|
||||
vec![committed_notification_1.clone(), reverted_notification.clone()]
|
||||
);
|
||||
|
||||
// Third notification (commit block 1, 2)
|
||||
wal.commit(&committed_notification_2)?;
|
||||
let file_id = 2;
|
||||
let committed_notification_2_cache = vec![
|
||||
(
|
||||
file_id,
|
||||
CachedBlock {
|
||||
action: CachedBlockAction::Commit,
|
||||
block: (block_1_reorged.number, block_1_reorged.hash()).into(),
|
||||
},
|
||||
),
|
||||
(
|
||||
file_id,
|
||||
CachedBlock {
|
||||
action: CachedBlockAction::Commit,
|
||||
block: (blocks[2].number, blocks[2].hash()).into(),
|
||||
},
|
||||
),
|
||||
];
|
||||
assert_eq!(
|
||||
wal.block_cache.iter().collect::<Vec<_>>(),
|
||||
[
|
||||
committed_notification_1_cache.clone(),
|
||||
reverted_notification_cache.clone(),
|
||||
committed_notification_2_cache.clone()
|
||||
]
|
||||
.concat()
|
||||
);
|
||||
assert_eq!(
|
||||
read_notifications(&wal)?,
|
||||
vec![
|
||||
committed_notification_1.clone(),
|
||||
reverted_notification.clone(),
|
||||
committed_notification_2.clone()
|
||||
]
|
||||
);
|
||||
|
||||
// Fourth notification (revert block 2, commit block 2, 3)
|
||||
wal.commit(&reorged_notification)?;
|
||||
let file_id = 3;
|
||||
let reorged_notification_cache = vec![
|
||||
(
|
||||
file_id,
|
||||
CachedBlock {
|
||||
action: CachedBlockAction::Revert,
|
||||
block: (blocks[2].number, blocks[2].hash()).into(),
|
||||
},
|
||||
),
|
||||
(
|
||||
file_id,
|
||||
CachedBlock {
|
||||
action: CachedBlockAction::Commit,
|
||||
block: (block_2_reorged.number, block_2_reorged.hash()).into(),
|
||||
},
|
||||
),
|
||||
(
|
||||
file_id,
|
||||
CachedBlock {
|
||||
action: CachedBlockAction::Commit,
|
||||
block: (blocks[3].number, blocks[3].hash()).into(),
|
||||
},
|
||||
),
|
||||
];
|
||||
assert_eq!(
|
||||
wal.block_cache.iter().collect::<Vec<_>>(),
|
||||
[
|
||||
committed_notification_1_cache,
|
||||
reverted_notification_cache,
|
||||
committed_notification_2_cache.clone(),
|
||||
reorged_notification_cache.clone()
|
||||
]
|
||||
.concat()
|
||||
);
|
||||
assert_eq!(
|
||||
read_notifications(&wal)?,
|
||||
vec![
|
||||
committed_notification_1.clone(),
|
||||
reverted_notification.clone(),
|
||||
committed_notification_2.clone(),
|
||||
reorged_notification.clone()
|
||||
]
|
||||
);
|
||||
|
||||
// Now, finalize the WAL up to the block 1. Block 1 was in the third notification that also
|
||||
// had block 2 committed. In this case, we can't split the notification into two parts, so
|
||||
// we preserve the whole notification in both the block cache and the storage, and delete
|
||||
// the notifications before it.
|
||||
wal.finalize((block_1_reorged.number, block_1_reorged.hash()).into())?;
|
||||
assert_eq!(
|
||||
wal.block_cache.iter().collect::<Vec<_>>(),
|
||||
[committed_notification_2_cache, reorged_notification_cache].concat()
|
||||
);
|
||||
assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
186
crates/exex/exex/src/wal/storage.rs
Normal file
186
crates/exex/exex/src/wal/storage.rs
Normal file
@ -0,0 +1,186 @@
|
||||
use std::{
|
||||
fs::File,
|
||||
io::{Read, Write},
|
||||
ops::RangeInclusive,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use eyre::OptionExt;
|
||||
use reth_exex_types::ExExNotification;
|
||||
use reth_tracing::tracing::debug;
|
||||
use tracing::instrument;
|
||||
|
||||
/// The underlying WAL storage backed by a directory of files.
|
||||
///
|
||||
/// Each notification is represented by a single file that contains a MessagePack-encoded
|
||||
/// notification.
|
||||
#[derive(Debug)]
|
||||
pub(super) struct Storage {
|
||||
/// The path to the WAL file.
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
/// Creates a new instance of [`Storage`] backed by the file at the given path and creates
|
||||
/// it doesn't exist.
|
||||
pub(super) fn new(path: impl AsRef<Path>) -> eyre::Result<Self> {
|
||||
reth_fs_util::create_dir_all(&path)?;
|
||||
|
||||
Ok(Self { path: path.as_ref().to_path_buf() })
|
||||
}
|
||||
|
||||
fn file_path(&self, id: u64) -> PathBuf {
|
||||
self.path.join(format!("{id}.wal"))
|
||||
}
|
||||
|
||||
fn parse_filename(filename: &str) -> eyre::Result<u64> {
|
||||
filename
|
||||
.strip_suffix(".wal")
|
||||
.and_then(|s| s.parse().ok())
|
||||
.ok_or_eyre(format!("failed to parse file name: {filename}"))
|
||||
}
|
||||
|
||||
/// Removes notification for the given file ID from the storage.
|
||||
#[instrument(target = "exex::wal::storage", skip(self))]
|
||||
fn remove_notification(&self, file_id: u64) {
|
||||
match reth_fs_util::remove_file(self.file_path(file_id)) {
|
||||
Ok(()) => debug!("Notification was removed from the storage"),
|
||||
Err(err) => debug!(?err, "Failed to remove notification from the storage"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the range of file IDs in the storage.
|
||||
///
|
||||
/// If there are no files in the storage, returns `None`.
|
||||
pub(super) fn files_range(&self) -> eyre::Result<Option<RangeInclusive<u64>>> {
|
||||
let mut min_id = None;
|
||||
let mut max_id = None;
|
||||
|
||||
for entry in reth_fs_util::read_dir(&self.path)? {
|
||||
let entry = entry?;
|
||||
let file_name = entry.file_name();
|
||||
let file_id = Self::parse_filename(&file_name.to_string_lossy())?;
|
||||
|
||||
min_id = min_id.map_or(Some(file_id), |min_id: u64| Some(min_id.min(file_id)));
|
||||
max_id = max_id.map_or(Some(file_id), |max_id: u64| Some(max_id.max(file_id)));
|
||||
}
|
||||
|
||||
Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id))
|
||||
}
|
||||
|
||||
/// Removes notifications from the storage according to the given range.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Number of removed notifications.
|
||||
pub(super) fn remove_notifications(&self, range: RangeInclusive<u64>) -> eyre::Result<usize> {
|
||||
for id in range.clone() {
|
||||
self.remove_notification(id);
|
||||
}
|
||||
|
||||
Ok(range.count())
|
||||
}
|
||||
|
||||
/// Removes notifications from the storage according to the given range.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Notifications that were removed.
|
||||
pub(super) fn take_notifications(
|
||||
&self,
|
||||
range: RangeInclusive<u64>,
|
||||
) -> eyre::Result<Vec<ExExNotification>> {
|
||||
let notifications = self.iter_notifications(range).collect::<eyre::Result<Vec<_>>>()?;
|
||||
|
||||
for (id, _) in ¬ifications {
|
||||
self.remove_notification(*id);
|
||||
}
|
||||
|
||||
Ok(notifications.into_iter().map(|(_, notification)| notification).collect())
|
||||
}
|
||||
|
||||
pub(super) fn iter_notifications(
|
||||
&self,
|
||||
range: RangeInclusive<u64>,
|
||||
) -> impl Iterator<Item = eyre::Result<(u64, ExExNotification)>> + '_ {
|
||||
range.map(move |id| self.read_notification(id).map(|notification| (id, notification)))
|
||||
}
|
||||
|
||||
/// Reads the notification from the file with the given id.
|
||||
pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result<ExExNotification> {
|
||||
debug!(?file_id, "Reading notification from WAL");
|
||||
|
||||
let file_path = self.file_path(file_id);
|
||||
let mut file = File::open(&file_path)?;
|
||||
read_notification(&mut file)
|
||||
}
|
||||
|
||||
/// Writes the notification to the file with the given id.
|
||||
pub(super) fn write_notification(
|
||||
&self,
|
||||
file_id: u64,
|
||||
notification: &ExExNotification,
|
||||
) -> eyre::Result<()> {
|
||||
debug!(?file_id, "Writing notification to WAL");
|
||||
|
||||
let file_path = self.file_path(file_id);
|
||||
let mut file = File::create_new(&file_path)?;
|
||||
write_notification(&mut file, notification)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(alexey): use rmp-serde when Alloy and Reth serde issues are resolved
|
||||
|
||||
fn write_notification(mut w: &mut impl Write, notification: &ExExNotification) -> eyre::Result<()> {
|
||||
// rmp_serde::encode::write(w, notification)?;
|
||||
serde_json::to_writer(&mut w, notification)?;
|
||||
w.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_notification(r: &mut impl Read) -> eyre::Result<ExExNotification> {
|
||||
// Ok(rmp_serde::from_read(r)?)
|
||||
Ok(serde_json::from_reader(r)?)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use eyre::OptionExt;
|
||||
use reth_exex_types::ExExNotification;
|
||||
use reth_provider::Chain;
|
||||
use reth_testing_utils::generators::{self, random_block};
|
||||
|
||||
use super::Storage;
|
||||
|
||||
#[test]
|
||||
fn test_roundtrip() -> eyre::Result<()> {
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let temp_dir = tempfile::tempdir()?;
|
||||
let storage = Storage::new(&temp_dir)?;
|
||||
|
||||
let old_block = random_block(&mut rng, 0, Default::default())
|
||||
.seal_with_senders()
|
||||
.ok_or_eyre("failed to recover senders")?;
|
||||
let new_block = random_block(&mut rng, 0, Default::default())
|
||||
.seal_with_senders()
|
||||
.ok_or_eyre("failed to recover senders")?;
|
||||
|
||||
let notification = ExExNotification::ChainReorged {
|
||||
new: Arc::new(Chain::new(vec![new_block], Default::default(), None)),
|
||||
old: Arc::new(Chain::new(vec![old_block], Default::default(), None)),
|
||||
};
|
||||
|
||||
// Do a round trip serialization and deserialization
|
||||
let file_id = 0;
|
||||
storage.write_notification(file_id, ¬ification)?;
|
||||
let deserialized_notification = storage.read_notification(file_id)?;
|
||||
assert_eq!(deserialized_notification, notification);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -36,7 +36,7 @@ pub struct BlockParams {
|
||||
}
|
||||
|
||||
/// Used to pass arguments for random block generation function in tests
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug)]
|
||||
pub struct BlockRangeParams {
|
||||
/// The parent hash of the block.
|
||||
pub parent: Option<B256>,
|
||||
@ -50,6 +50,17 @@ pub struct BlockRangeParams {
|
||||
pub withdrawals_count: Option<Range<u8>>,
|
||||
}
|
||||
|
||||
impl Default for BlockRangeParams {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
parent: None,
|
||||
tx_count: 0..u8::MAX / 2,
|
||||
requests_count: None,
|
||||
withdrawals_count: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a random number generator that can be seeded using the `SEED` environment variable.
|
||||
///
|
||||
/// If `SEED` is not set, a random seed is used.
|
||||
|
||||
Reference in New Issue
Block a user