feat(exex): WAL handle (#11266)

This commit is contained in:
Alexey Shekhirin
2024-09-27 10:10:35 +01:00
committed by GitHub
parent 37b0c56b85
commit 67221247c5
9 changed files with 212 additions and 121 deletions

1
Cargo.lock generated
View File

@ -7400,6 +7400,7 @@ dependencies = [
"eyre", "eyre",
"futures", "futures",
"metrics", "metrics",
"parking_lot 0.12.3",
"reth-blockchain-tree", "reth-blockchain-tree",
"reth-chain-state", "reth-chain-state",
"reth-chainspec", "reth-chainspec",

View File

@ -44,6 +44,7 @@ tokio.workspace = true
dashmap.workspace = true dashmap.workspace = true
eyre.workspace = true eyre.workspace = true
metrics.workspace = true metrics.workspace = true
parking_lot.workspace = true
serde_json.workspace = true serde_json.workspace = true
tracing.workspace = true tracing.workspace = true

View File

@ -1,4 +1,6 @@
use crate::{wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight}; use crate::{
wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
};
use alloy_primitives::BlockNumber; use alloy_primitives::BlockNumber;
use futures::StreamExt; use futures::StreamExt;
use metrics::Gauge; use metrics::Gauge;
@ -67,10 +69,12 @@ impl ExExHandle {
node_head: Head, node_head: Head,
provider: P, provider: P,
executor: E, executor: E,
wal_handle: WalHandle,
) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) { ) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
let (notification_tx, notification_rx) = mpsc::channel(1); let (notification_tx, notification_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::unbounded_channel(); let (event_tx, event_rx) = mpsc::unbounded_channel();
let notifications = ExExNotifications::new(node_head, provider, executor, notification_rx); let notifications =
ExExNotifications::new(node_head, provider, executor, notification_rx, wal_handle);
( (
Self { Self {
@ -521,8 +525,11 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_delivers_events() { async fn test_delivers_events() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, event_tx, mut _notification_rx) = let (mut exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
// Send an event and check that it's delivered correctly // Send an event and check that it's delivered correctly
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap(); event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();
@ -533,65 +540,48 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_has_exexs() { async fn test_has_exexs() {
let temp_dir = tempfile::tempdir().unwrap(); let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (exex_handle_1, _, _) = let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
assert!(!ExExManager::new( assert!(!ExExManager::new(vec![], 0, wal.clone(), empty_finalized_header_stream())
vec![], .handle
0, .has_exexs());
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream()
)
.handle
.has_exexs());
assert!(ExExManager::new( assert!(ExExManager::new(vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
vec![exex_handle_1], .handle
0, .has_exexs());
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream()
)
.handle
.has_exexs());
} }
#[tokio::test] #[tokio::test]
async fn test_has_capacity() { async fn test_has_capacity() {
let temp_dir = tempfile::tempdir().unwrap(); let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (exex_handle_1, _, _) = let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
assert!(!ExExManager::new( assert!(!ExExManager::new(vec![], 0, wal.clone(), empty_finalized_header_stream())
vec![], .handle
0, .has_capacity());
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream()
)
.handle
.has_capacity());
assert!(ExExManager::new( assert!(ExExManager::new(vec![exex_handle_1], 10, wal, empty_finalized_header_stream())
vec![exex_handle_1], .handle
10, .has_capacity());
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream()
)
.handle
.has_capacity());
} }
#[test] #[test]
fn test_push_notification() { fn test_push_notification() {
let temp_dir = tempfile::tempdir().unwrap(); let temp_dir = tempfile::tempdir().unwrap();
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); let wal = Wal::new(temp_dir.path()).unwrap();
let (exex_handle, _, _) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
// Create a mock ExExManager and add the exex_handle to it // Create a mock ExExManager and add the exex_handle to it
let mut exex_manager = ExExManager::new( let mut exex_manager =
vec![exex_handle], ExExManager::new(vec![exex_handle], 10, wal, empty_finalized_header_stream());
10,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
// Define the notification for testing // Define the notification for testing
let mut block1 = SealedBlockWithSenders::default(); let mut block1 = SealedBlockWithSenders::default();
@ -634,16 +624,15 @@ mod tests {
#[test] #[test]
fn test_update_capacity() { fn test_update_capacity() {
let temp_dir = tempfile::tempdir().unwrap(); let temp_dir = tempfile::tempdir().unwrap();
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); let wal = Wal::new(temp_dir.path()).unwrap();
let (exex_handle, _, _) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
// Create a mock ExExManager and add the exex_handle to it // Create a mock ExExManager and add the exex_handle to it
let max_capacity = 5; let max_capacity = 5;
let mut exex_manager = ExExManager::new( let mut exex_manager =
vec![exex_handle], ExExManager::new(vec![exex_handle], max_capacity, wal, empty_finalized_header_stream());
max_capacity,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
// Push some notifications to fill part of the buffer // Push some notifications to fill part of the buffer
let mut block1 = SealedBlockWithSenders::default(); let mut block1 = SealedBlockWithSenders::default();
@ -674,8 +663,10 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_updates_block_height() { async fn test_updates_block_height() {
let temp_dir = tempfile::tempdir().unwrap(); let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (exex_handle, event_tx, mut _notification_rx) = let (exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
// Check initial block height // Check initial block height
assert!(exex_handle.finished_height.is_none()); assert!(exex_handle.finished_height.is_none());
@ -717,11 +708,13 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_updates_block_height_lower() { async fn test_updates_block_height_lower() {
let temp_dir = tempfile::tempdir().unwrap(); let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
// Create two `ExExHandle` instances // Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) = let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle());
let (exex_handle2, event_tx2, _) = let (exex_handle2, event_tx2, _) =
ExExHandle::new("test_exex2".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle());
// Send events to update the block heights of the two handles, with the second being lower // Send events to update the block heights of the two handles, with the second being lower
event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap(); event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap();
@ -756,11 +749,13 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_updates_block_height_greater() { async fn test_updates_block_height_greater() {
let temp_dir = tempfile::tempdir().unwrap(); let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
// Create two `ExExHandle` instances // Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) = let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle());
let (exex_handle2, event_tx2, _) = let (exex_handle2, event_tx2, _) =
ExExHandle::new("test_exex2".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle());
// Assert that the initial block height is `None` for the first `ExExHandle`. // Assert that the initial block height is `None` for the first `ExExHandle`.
assert!(exex_handle1.finished_height.is_none()); assert!(exex_handle1.finished_height.is_none());
@ -802,8 +797,10 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_exex_manager_capacity() { async fn test_exex_manager_capacity() {
let temp_dir = tempfile::tempdir().unwrap(); let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (exex_handle_1, _, _) = let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
// Create an ExExManager with a small max capacity // Create an ExExManager with a small max capacity
let max_capacity = 2; let max_capacity = 2;
@ -846,8 +843,11 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn exex_handle_new() { async fn exex_handle_new() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, _, mut notifications) = let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
// Check initial state // Check initial state
assert_eq!(exex_handle.id, "test_exex"); assert_eq!(exex_handle.id, "test_exex");
@ -889,8 +889,11 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_notification_if_finished_height_gt_chain_tip() { async fn test_notification_if_finished_height_gt_chain_tip() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, _, mut notifications) = let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
// Set finished_height to a value higher than the block tip // Set finished_height to a value higher than the block tip
exex_handle.finished_height = Some(15); exex_handle.finished_height = Some(15);
@ -931,8 +934,11 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_sends_chain_reorged_notification() { async fn test_sends_chain_reorged_notification() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, _, mut notifications) = let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let notification = ExExNotification::ChainReorged { let notification = ExExNotification::ChainReorged {
old: Arc::new(Chain::default()), old: Arc::new(Chain::default()),
@ -962,8 +968,11 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_sends_chain_reverted_notification() { async fn test_sends_chain_reverted_notification() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, _, mut notifications) = let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
@ -994,6 +1003,7 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap(); let temp_dir = tempfile::tempdir().unwrap();
let mut wal = Wal::new(temp_dir.path()).unwrap(); let mut wal = Wal::new(temp_dir.path()).unwrap();
let block = random_block(&mut generators::rng(), 0, Default::default()) let block = random_block(&mut generators::rng(), 0, Default::default())
.seal_with_senders() .seal_with_senders()
.ok_or_eyre("failed to recover senders")?; .ok_or_eyre("failed to recover senders")?;
@ -1005,7 +1015,8 @@ mod tests {
let (tx, rx) = watch::channel(None); let (tx, rx) = watch::channel(None);
let finalized_header_stream = ForkChoiceStream::new(rx); let finalized_header_stream = ForkChoiceStream::new(rx);
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); let (exex_handle, _, _) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let mut exex_manager = let mut exex_manager =
std::pin::pin!(ExExManager::new(vec![exex_handle], 1, wal, finalized_header_stream)); std::pin::pin!(ExExManager::new(vec![exex_handle], 1, wal, finalized_header_stream));

View File

@ -1,4 +1,4 @@
use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob}; use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
use alloy_primitives::U256; use alloy_primitives::U256;
use eyre::OptionExt; use eyre::OptionExt;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
@ -21,6 +21,7 @@ pub struct ExExNotifications<P, E> {
provider: P, provider: P,
executor: E, executor: E,
notifications: Receiver<ExExNotification>, notifications: Receiver<ExExNotification>,
wal_handle: WalHandle,
} }
impl<P: Debug, E: Debug> Debug for ExExNotifications<P, E> { impl<P: Debug, E: Debug> Debug for ExExNotifications<P, E> {
@ -40,8 +41,9 @@ impl<P, E> ExExNotifications<P, E> {
provider: P, provider: P,
executor: E, executor: E,
notifications: Receiver<ExExNotification>, notifications: Receiver<ExExNotification>,
wal_handle: WalHandle,
) -> Self { ) -> Self {
Self { node_head, provider, executor, notifications } Self { node_head, provider, executor, notifications, wal_handle }
} }
/// Receives the next value for this receiver. /// Receives the next value for this receiver.
@ -113,6 +115,7 @@ where
self.provider, self.provider,
self.executor, self.executor,
self.notifications, self.notifications,
self.wal_handle,
head, head,
) )
} }
@ -134,6 +137,8 @@ pub struct ExExNotificationsWithHead<P, E> {
provider: P, provider: P,
executor: E, executor: E,
notifications: Receiver<ExExNotification>, notifications: Receiver<ExExNotification>,
#[allow(dead_code)]
wal_handle: WalHandle,
exex_head: ExExHead, exex_head: ExExHead,
pending_sync: bool, pending_sync: bool,
/// The backfill job to run before consuming any notifications. /// The backfill job to run before consuming any notifications.
@ -154,6 +159,7 @@ where
provider: P, provider: P,
executor: E, executor: E,
notifications: Receiver<ExExNotification>, notifications: Receiver<ExExNotification>,
wal_handle: WalHandle,
exex_head: ExExHead, exex_head: ExExHead,
) -> Self { ) -> Self {
Self { Self {
@ -161,6 +167,7 @@ where
provider, provider,
executor, executor,
notifications, notifications,
wal_handle,
exex_head, exex_head,
pending_sync: true, pending_sync: true,
backfill_job: None, backfill_job: None,
@ -344,6 +351,8 @@ where
mod tests { mod tests {
use std::future::poll_fn; use std::future::poll_fn;
use crate::Wal;
use super::*; use super::*;
use alloy_consensus::Header; use alloy_consensus::Header;
use eyre::OptionExt; use eyre::OptionExt;
@ -362,6 +371,9 @@ mod tests {
async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> { async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
let mut rng = generators::rng(); let mut rng = generators::rng();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory(); let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?; let genesis_hash = init_genesis(&provider_factory)?;
let genesis_block = provider_factory let genesis_block = provider_factory
@ -412,6 +424,7 @@ mod tests {
provider, provider,
EthExecutorProvider::mainnet(), EthExecutorProvider::mainnet(),
notifications_rx, notifications_rx,
wal.handle(),
) )
.with_head(exex_head); .with_head(exex_head);
@ -445,6 +458,9 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn exex_notifications_same_head_canonical() -> eyre::Result<()> { async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory(); let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?; let genesis_hash = init_genesis(&provider_factory)?;
let genesis_block = provider_factory let genesis_block = provider_factory
@ -485,6 +501,7 @@ mod tests {
provider, provider,
EthExecutorProvider::mainnet(), EthExecutorProvider::mainnet(),
notifications_rx, notifications_rx,
wal.handle(),
) )
.with_head(exex_head); .with_head(exex_head);
@ -504,6 +521,9 @@ mod tests {
async fn test_notifications_ahead_of_head() -> eyre::Result<()> { async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
let mut rng = generators::rng(); let mut rng = generators::rng();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory(); let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?; let genesis_hash = init_genesis(&provider_factory)?;
let genesis_block = provider_factory let genesis_block = provider_factory
@ -544,6 +564,7 @@ mod tests {
provider, provider,
EthExecutorProvider::mainnet(), EthExecutorProvider::mainnet(),
notifications_rx, notifications_rx,
wal.handle(),
) )
.with_head(exex_head); .with_head(exex_head);

View File

@ -1,6 +1,7 @@
use std::collections::{BTreeMap, VecDeque}; use std::collections::{BTreeMap, VecDeque};
use dashmap::DashMap; use dashmap::DashMap;
use parking_lot::RwLock;
use reth_exex_types::ExExNotification; use reth_exex_types::ExExNotification;
use reth_primitives::{BlockNumHash, B256}; use reth_primitives::{BlockNumHash, B256};
@ -15,7 +16,7 @@ pub struct BlockCache {
/// For each notification written to the WAL, there will be an entry per block written to /// For each notification written to the WAL, there will be an entry per block written to
/// the cache with the same file ID. I.e. for each notification, there may be multiple blocks /// the cache with the same file ID. I.e. for each notification, there may be multiple blocks
/// in the cache. /// in the cache.
files: BTreeMap<u64, VecDeque<CachedBlock>>, files: RwLock<BTreeMap<u64, VecDeque<CachedBlock>>>,
/// A mapping of `Block Hash -> Block`. /// A mapping of `Block Hash -> Block`.
/// ///
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per /// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
@ -26,45 +27,52 @@ pub struct BlockCache {
impl BlockCache { impl BlockCache {
/// Creates a new instance of [`BlockCache`]. /// Creates a new instance of [`BlockCache`].
pub(super) fn new() -> Self { pub(super) fn new() -> Self {
Self { files: BTreeMap::new(), blocks: DashMap::new() } Self { files: RwLock::new(BTreeMap::new()), blocks: DashMap::new() }
} }
/// Returns `true` if the cache is empty. /// Returns `true` if the cache is empty.
pub(super) fn is_empty(&self) -> bool { pub(super) fn is_empty(&self) -> bool {
self.files.is_empty() self.files.read().is_empty()
} }
/// Returns a front-to-back iterator. /// Returns a front-to-back iterator.
pub(super) fn iter(&self) -> impl Iterator<Item = (u64, CachedBlock)> + '_ { pub(super) fn iter(&self) -> impl Iterator<Item = (u64, CachedBlock)> + '_ {
self.files.iter().flat_map(|(k, v)| v.iter().map(move |b| (*k, *b))) self.files
.read()
.iter()
.flat_map(|(k, v)| v.iter().map(move |b| (*k, *b)))
.collect::<Vec<_>>()
.into_iter()
} }
/// Provides a reference to the first block from the cache, or `None` if the cache is /// Provides a reference to the first block from the cache, or `None` if the cache is
/// empty. /// empty.
pub(super) fn front(&self) -> Option<(u64, CachedBlock)> { pub(super) fn front(&self) -> Option<(u64, CachedBlock)> {
self.files.first_key_value().and_then(|(k, v)| v.front().map(|b| (*k, *b))) self.files.read().first_key_value().and_then(|(k, v)| v.front().map(|b| (*k, *b)))
} }
/// Provides a reference to the last block from the cache, or `None` if the cache is /// Provides a reference to the last block from the cache, or `None` if the cache is
/// empty. /// empty.
pub(super) fn back(&self) -> Option<(u64, CachedBlock)> { pub(super) fn back(&self) -> Option<(u64, CachedBlock)> {
self.files.last_key_value().and_then(|(k, v)| v.back().map(|b| (*k, *b))) self.files.read().last_key_value().and_then(|(k, v)| v.back().map(|b| (*k, *b)))
} }
/// Removes the notification with the given file ID. /// Removes the notification with the given file ID.
pub(super) fn remove_notification(&mut self, key: u64) -> Option<VecDeque<CachedBlock>> { pub(super) fn remove_notification(&self, key: u64) -> Option<VecDeque<CachedBlock>> {
self.files.remove(&key) self.files.write().remove(&key)
} }
/// Pops the first block from the cache. If it resulted in the whole file entry being empty, /// Pops the first block from the cache. If it resulted in the whole file entry being empty,
/// it will also remove the file entry. /// it will also remove the file entry.
pub(super) fn pop_front(&mut self) -> Option<(u64, CachedBlock)> { pub(super) fn pop_front(&self) -> Option<(u64, CachedBlock)> {
let first_entry = self.files.first_entry()?; let mut files = self.files.write();
let first_entry = files.first_entry()?;
let key = *first_entry.key(); let key = *first_entry.key();
let blocks = first_entry.into_mut(); let blocks = first_entry.into_mut();
let first_block = blocks.pop_front().unwrap(); let first_block = blocks.pop_front().unwrap();
if blocks.is_empty() { if blocks.is_empty() {
self.files.remove(&key); files.remove(&key);
} }
Some((key, first_block)) Some((key, first_block))
@ -72,44 +80,40 @@ impl BlockCache {
/// Pops the last block from the cache. If it resulted in the whole file entry being empty, /// Pops the last block from the cache. If it resulted in the whole file entry being empty,
/// it will also remove the file entry. /// it will also remove the file entry.
pub(super) fn pop_back(&mut self) -> Option<(u64, CachedBlock)> { pub(super) fn pop_back(&self) -> Option<(u64, CachedBlock)> {
let last_entry = self.files.last_entry()?; let mut files = self.files.write();
let last_entry = files.last_entry()?;
let key = *last_entry.key(); let key = *last_entry.key();
let blocks = last_entry.into_mut(); let blocks = last_entry.into_mut();
let last_block = blocks.pop_back().unwrap(); let last_block = blocks.pop_back().unwrap();
if blocks.is_empty() { if blocks.is_empty() {
self.files.remove(&key); files.remove(&key);
} }
Some((key, last_block)) Some((key, last_block))
} }
/// Appends a block to the back of the specified file entry.
pub(super) fn insert(&mut self, file_id: u64, block: CachedBlock) {
self.files.entry(file_id).or_default().push_back(block);
}
/// Inserts the blocks from the notification into the cache with the given file ID. /// Inserts the blocks from the notification into the cache with the given file ID.
/// ///
/// First, inserts the reverted blocks (if any), then the committed blocks (if any). /// First, inserts the reverted blocks (if any), then the committed blocks (if any).
pub(super) fn insert_notification_blocks_with_file_id( pub(super) fn insert_notification_blocks_with_file_id(
&mut self, &self,
file_id: u64, file_id: u64,
notification: &ExExNotification, notification: &ExExNotification,
) { ) {
let mut files = self.files.write();
let reverted_chain = notification.reverted_chain(); let reverted_chain = notification.reverted_chain();
let committed_chain = notification.committed_chain(); let committed_chain = notification.committed_chain();
if let Some(reverted_chain) = reverted_chain { if let Some(reverted_chain) = reverted_chain {
for block in reverted_chain.blocks().values() { for block in reverted_chain.blocks().values() {
self.insert( files.entry(file_id).or_default().push_back(CachedBlock {
file_id, action: CachedBlockAction::Revert,
CachedBlock { block: (block.number, block.hash()).into(),
action: CachedBlockAction::Revert, parent_hash: block.parent_hash,
block: (block.number, block.hash()).into(), });
parent_hash: block.parent_hash,
},
);
} }
} }
@ -120,7 +124,7 @@ impl BlockCache {
block: (block.number, block.hash()).into(), block: (block.number, block.hash()).into(),
parent_hash: block.parent_hash, parent_hash: block.parent_hash,
}; };
self.insert(file_id, cached_block); files.entry(file_id).or_default().push_back(cached_block);
self.blocks.insert(block.hash(), cached_block); self.blocks.insert(block.hash(), cached_block);
} }
} }

View File

@ -5,7 +5,7 @@ pub use cache::BlockCache;
mod storage; mod storage;
pub use storage::Storage; pub use storage::Storage;
use std::path::Path; use std::{path::Path, sync::Arc};
use reth_exex_types::ExExNotification; use reth_exex_types::ExExNotification;
use reth_primitives::BlockNumHash; use reth_primitives::BlockNumHash;
@ -15,23 +15,62 @@ use reth_tracing::tracing::{debug, instrument};
/// ///
/// WAL is backed by a directory of binary files represented by [`Storage`] and a block cache /// WAL is backed by a directory of binary files represented by [`Storage`] and a block cache
/// represented by [`BlockCache`]. The role of the block cache is to avoid walking the WAL directory /// represented by [`BlockCache`]. The role of the block cache is to avoid walking the WAL directory
/// and decoding notifications every time we want to rollback/finalize the WAL. /// and decoding notifications every time we want to iterate or finalize the WAL.
/// ///
/// The expected mode of operation is as follows: /// The expected mode of operation is as follows:
/// 1. On every new canonical chain notification, call [`Wal::commit`]. /// 1. On every new canonical chain notification, call [`Wal::commit`].
/// 2. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the /// 2. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the
/// WAL. /// WAL.
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct Wal { pub struct Wal {
inner: Arc<WalInner>,
}
impl Wal {
/// Creates a new instance of [`Wal`].
pub fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
Ok(Self { inner: Arc::new(WalInner::new(directory)?) })
}
/// Returns a read-only handle to the WAL.
pub fn handle(&self) -> WalHandle {
WalHandle { wal: self.inner.clone() }
}
/// Commits the notification to WAL.
pub fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> {
self.inner.commit(notification)
}
/// Finalizes the WAL to the given block, inclusive.
///
/// 1. Finds a notification with first unfinalized block (first notification containing a
/// committed block higher than `to_block`).
/// 2. Removes the notifications from the beginning of WAL until the found notification. If this
/// notification includes both finalized and non-finalized blocks, it will not be removed.
pub fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
self.inner.finalize(to_block)
}
/// Returns an iterator over all notifications in the WAL.
pub fn iter_notifications(
&self,
) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification>> + '_>> {
self.inner.iter_notifications()
}
}
/// Inner type for the WAL.
#[derive(Debug)]
struct WalInner {
/// The underlying WAL storage backed by a file. /// The underlying WAL storage backed by a file.
storage: Storage, storage: Storage,
/// WAL block cache. See [`cache::BlockCache`] docs for more details. /// WAL block cache. See [`cache::BlockCache`] docs for more details.
block_cache: BlockCache, block_cache: BlockCache,
} }
impl Wal { impl WalInner {
/// Creates a new instance of [`Wal`]. fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
pub fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
let mut wal = Self { storage: Storage::new(directory)?, block_cache: BlockCache::new() }; let mut wal = Self { storage: Storage::new(directory)?, block_cache: BlockCache::new() };
wal.fill_block_cache()?; wal.fill_block_cache()?;
Ok(wal) Ok(wal)
@ -62,12 +101,11 @@ impl Wal {
Ok(()) Ok(())
} }
/// Commits the notification to WAL.
#[instrument(target = "exex::wal", skip_all, fields( #[instrument(target = "exex::wal", skip_all, fields(
reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()), reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range()) committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
))] ))]
pub fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> { fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> {
let file_id = self.block_cache.back().map_or(0, |block| block.0 + 1); let file_id = self.block_cache.back().map_or(0, |block| block.0 + 1);
self.storage.write_notification(file_id, notification)?; self.storage.write_notification(file_id, notification)?;
@ -84,7 +122,7 @@ impl Wal {
/// 2. Removes the notifications from the beginning of WAL until the found notification. If this /// 2. Removes the notifications from the beginning of WAL until the found notification. If this
/// notification includes both finalized and non-finalized blocks, it will not be removed. /// notification includes both finalized and non-finalized blocks, it will not be removed.
#[instrument(target = "exex::wal", skip(self))] #[instrument(target = "exex::wal", skip(self))]
pub fn finalize(&mut self, to_block: BlockNumHash) -> eyre::Result<()> { fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
// First, walk cache to find the file ID of the notification with the finalized block and // First, walk cache to find the file ID of the notification with the finalized block and
// save the file ID with the first unfinalized block. Do not remove any notifications // save the file ID with the first unfinalized block. Do not remove any notifications
// yet. // yet.
@ -152,7 +190,7 @@ impl Wal {
} }
/// Returns an iterator over all notifications in the WAL. /// Returns an iterator over all notifications in the WAL.
pub(crate) fn iter_notifications( fn iter_notifications(
&self, &self,
) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification>> + '_>> { ) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification>> + '_>> {
let Some(range) = self.storage.files_range()? else { let Some(range) = self.storage.files_range()? else {
@ -163,6 +201,12 @@ impl Wal {
} }
} }
/// A read-only handle to the WAL that can be shared.
#[derive(Debug)]
pub struct WalHandle {
wal: Arc<WalInner>,
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc; use std::sync::Arc;
@ -180,9 +224,10 @@ mod tests {
}; };
fn read_notifications(wal: &Wal) -> eyre::Result<Vec<ExExNotification>> { fn read_notifications(wal: &Wal) -> eyre::Result<Vec<ExExNotification>> {
let Some(files_range) = wal.storage.files_range()? else { return Ok(Vec::new()) }; let Some(files_range) = wal.inner.storage.files_range()? else { return Ok(Vec::new()) };
wal.storage wal.inner
.storage
.iter_notifications(files_range) .iter_notifications(files_range)
.map(|entry| Ok(entry?.1)) .map(|entry| Ok(entry?.1))
.collect::<eyre::Result<_>>() .collect::<eyre::Result<_>>()
@ -197,7 +242,7 @@ mod tests {
// Create an instance of the WAL in a temporary directory // Create an instance of the WAL in a temporary directory
let temp_dir = tempfile::tempdir()?; let temp_dir = tempfile::tempdir()?;
let mut wal = Wal::new(&temp_dir)?; let mut wal = Wal::new(&temp_dir)?;
assert!(wal.block_cache.is_empty()); assert!(wal.inner.block_cache.is_empty());
// Create 4 canonical blocks and one reorged block with number 2 // Create 4 canonical blocks and one reorged block with number 2
let blocks = random_block_range(&mut rng, 0..=3, BlockRangeParams::default()) let blocks = random_block_range(&mut rng, 0..=3, BlockRangeParams::default())
@ -275,7 +320,10 @@ mod tests {
), ),
]; ];
wal.commit(&committed_notification_1)?; wal.commit(&committed_notification_1)?;
assert_eq!(wal.block_cache.iter().collect::<Vec<_>>(), committed_notification_1_cache); assert_eq!(
wal.inner.block_cache.iter().collect::<Vec<_>>(),
committed_notification_1_cache
);
assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]); assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
// Second notification (revert block 1) // Second notification (revert block 1)
@ -290,7 +338,7 @@ mod tests {
}, },
)]; )];
assert_eq!( assert_eq!(
wal.block_cache.iter().collect::<Vec<_>>(), wal.inner.block_cache.iter().collect::<Vec<_>>(),
[committed_notification_1_cache.clone(), reverted_notification_cache.clone()].concat() [committed_notification_1_cache.clone(), reverted_notification_cache.clone()].concat()
); );
assert_eq!( assert_eq!(
@ -320,7 +368,7 @@ mod tests {
), ),
]; ];
assert_eq!( assert_eq!(
wal.block_cache.iter().collect::<Vec<_>>(), wal.inner.block_cache.iter().collect::<Vec<_>>(),
[ [
committed_notification_1_cache.clone(), committed_notification_1_cache.clone(),
reverted_notification_cache.clone(), reverted_notification_cache.clone(),
@ -367,7 +415,7 @@ mod tests {
), ),
]; ];
assert_eq!( assert_eq!(
wal.block_cache.iter().collect::<Vec<_>>(), wal.inner.block_cache.iter().collect::<Vec<_>>(),
[ [
committed_notification_1_cache, committed_notification_1_cache,
reverted_notification_cache, reverted_notification_cache,
@ -392,7 +440,7 @@ mod tests {
// the notifications before it. // the notifications before it.
wal.finalize((block_1_reorged.number, block_1_reorged.hash()).into())?; wal.finalize((block_1_reorged.number, block_1_reorged.hash()).into())?;
assert_eq!( assert_eq!(
wal.block_cache.iter().collect::<Vec<_>>(), wal.inner.block_cache.iter().collect::<Vec<_>>(),
[committed_notification_2_cache, reorged_notification_cache].concat() [committed_notification_2_cache, reorged_notification_cache].concat()
); );
assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]); assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]);

View File

@ -13,7 +13,7 @@ use tracing::instrument;
/// ///
/// Each notification is represented by a single file that contains a MessagePack-encoded /// Each notification is represented by a single file that contains a MessagePack-encoded
/// notification. /// notification.
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct Storage { pub struct Storage {
/// The path to the WAL file. /// The path to the WAL file.
path: PathBuf, path: PathBuf,

View File

@ -20,7 +20,7 @@ use reth_db_common::init::init_genesis;
use reth_ethereum_engine_primitives::EthereumEngineValidator; use reth_ethereum_engine_primitives::EthereumEngineValidator;
use reth_evm::test_utils::MockExecutorProvider; use reth_evm::test_utils::MockExecutorProvider;
use reth_execution_types::Chain; use reth_execution_types::Chain;
use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications}; use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications, Wal};
use reth_network::{config::SecretKey, NetworkConfigBuilder, NetworkManager}; use reth_network::{config::SecretKey, NetworkConfigBuilder, NetworkManager};
use reth_node_api::{ use reth_node_api::{
FullNodeTypes, FullNodeTypesAdapter, NodeTypes, NodeTypesWithDBAdapter, NodeTypesWithEngine, FullNodeTypes, FullNodeTypesAdapter, NodeTypes, NodeTypesWithDBAdapter, NodeTypesWithEngine,
@ -49,6 +49,7 @@ use reth_provider::{
use reth_tasks::TaskManager; use reth_tasks::TaskManager;
use reth_transaction_pool::test_utils::{testing_pool, TestPool}; use reth_transaction_pool::test_utils::{testing_pool, TestPool};
use std::{ use std::{
env::temp_dir,
fmt::Debug, fmt::Debug,
future::{poll_fn, Future}, future::{poll_fn, Future},
sync::Arc, sync::Arc,
@ -310,6 +311,8 @@ pub async fn test_exex_context_with_chain_spec(
components.provider.clone(), components.provider.clone(),
components.components.executor.clone(), components.components.executor.clone(),
notifications_rx, notifications_rx,
// TODO(alexey): do we want to expose WAL to the user?
Wal::new(temp_dir())?.handle(),
); );
let ctx = ExExContext { let ctx = ExExContext {

View File

@ -45,6 +45,15 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
return Ok(None) return Ok(None)
} }
let exex_wal = Wal::new(
config_container
.config
.datadir
.clone()
.resolve_datadir(config_container.config.chain.chain())
.exex_wal(),
)?;
let mut exex_handles = Vec::with_capacity(extensions.len()); let mut exex_handles = Vec::with_capacity(extensions.len());
let mut exexes = Vec::with_capacity(extensions.len()); let mut exexes = Vec::with_capacity(extensions.len());
@ -55,6 +64,7 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
head, head,
components.provider().clone(), components.provider().clone(),
components.block_executor().clone(), components.block_executor().clone(),
exex_wal.handle(),
); );
exex_handles.push(handle); exex_handles.push(handle);
@ -96,14 +106,6 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
// spawn exex manager // spawn exex manager
debug!(target: "reth::cli", "spawning exex manager"); debug!(target: "reth::cli", "spawning exex manager");
// todo(onbjerg): rm magic number // todo(onbjerg): rm magic number
let exex_wal = Wal::new(
config_container
.config
.datadir
.clone()
.resolve_datadir(config_container.config.chain.chain())
.exex_wal(),
)?;
let exex_manager = ExExManager::new( let exex_manager = ExExManager::new(
exex_handles, exex_handles,
1024, 1024,