feat(exex): finalize ExEx WAL on new finalized block header (#11174)

This commit is contained in:
Alexey Shekhirin
2024-09-25 12:53:51 +01:00
committed by GitHub
parent 5d2867f2c5
commit 2224e6c48b
12 changed files with 213 additions and 37 deletions

View File

@ -13,6 +13,7 @@ workspace = true
[dependencies]
## reth
reth-chain-state.workspace = true
reth-chainspec.workspace = true
reth-config.workspace = true
reth-evm.workspace = true

View File

@ -47,6 +47,7 @@ mod manager;
pub use manager::*;
mod wal;
pub use wal::*;
// Re-export exex types
#[doc(inline)]

View File

@ -1,14 +1,17 @@
use crate::{
BackfillJobFactory, ExExEvent, ExExNotification, FinishedExExHeight, StreamBackfillJob,
wal::Wal, BackfillJobFactory, ExExEvent, ExExNotification, FinishedExExHeight,
StreamBackfillJob,
};
use alloy_primitives::{BlockNumber, U256};
use eyre::OptionExt;
use futures::{Stream, StreamExt};
use metrics::Gauge;
use reth_chain_state::ForkChoiceStream;
use reth_chainspec::Head;
use reth_evm::execute::BlockExecutorProvider;
use reth_exex_types::ExExHead;
use reth_metrics::{metrics::Counter, Metrics};
use reth_primitives::SealedHeader;
use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
use reth_tracing::tracing::debug;
use std::{
@ -530,6 +533,11 @@ pub struct ExExManager {
/// The finished height of all `ExEx`'s.
finished_height: watch::Sender<FinishedExExHeight>,
/// Write-Ahead Log for the [`ExExNotification`]s.
wal: Wal,
/// A stream of finalized headers.
finalized_header_stream: ForkChoiceStream<SealedHeader>,
/// A handle to the `ExEx` manager.
handle: ExExManagerHandle,
/// Metrics for the `ExEx` manager.
@ -544,7 +552,12 @@ impl ExExManager {
///
/// When the capacity is exceeded (which can happen if an `ExEx` is slow) no one can send
/// notifications over [`ExExManagerHandle`]s until there is capacity again.
pub fn new(handles: Vec<ExExHandle>, max_capacity: usize) -> Self {
pub fn new(
handles: Vec<ExExHandle>,
max_capacity: usize,
wal: Wal,
finalized_header_stream: ForkChoiceStream<SealedHeader>,
) -> Self {
let num_exexs = handles.len();
let (handle_tx, handle_rx) = mpsc::unbounded_channel();
@ -575,6 +588,9 @@ impl ExExManager {
is_ready: is_ready_tx,
finished_height: finished_height_tx,
wal,
finalized_header_stream,
handle: ExExManagerHandle {
exex_tx: handle_tx,
num_exexs,
@ -618,6 +634,16 @@ impl Future for ExExManager {
type Output = eyre::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Drain the finalized header stream and grab the last finalized header
let mut last_finalized_header = None;
while let Poll::Ready(finalized_header) = self.finalized_header_stream.poll_next_unpin(cx) {
last_finalized_header = finalized_header;
}
// If there is a finalized header, finalize the WAL with it
if let Some(header) = last_finalized_header {
self.wal.finalize((header.number, header.hash()).into())?;
}
// drain handle notifications
while self.buffer.len() < self.max_capacity {
if let Poll::Ready(Some(notification)) = self.handle_rx.poll_recv(cx) {
@ -820,6 +846,13 @@ mod tests {
};
use reth_testing_utils::generators::{self, random_block, BlockParams};
fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
let (tx, rx) = watch::channel(None);
// Do not drop the sender, otherwise the receiver will always return an error
std::mem::forget(tx);
ForkChoiceStream::new(rx)
}
#[tokio::test]
async fn test_delivers_events() {
let (mut exex_handle, event_tx, mut _notification_rx) =
@ -833,30 +866,66 @@ mod tests {
#[tokio::test]
async fn test_has_exexs() {
let temp_dir = tempfile::tempdir().unwrap();
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ());
assert!(!ExExManager::new(vec![], 0).handle.has_exexs());
assert!(!ExExManager::new(
vec![],
0,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream()
)
.handle
.has_exexs());
assert!(ExExManager::new(vec![exex_handle_1], 0).handle.has_exexs());
assert!(ExExManager::new(
vec![exex_handle_1],
0,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream()
)
.handle
.has_exexs());
}
#[tokio::test]
async fn test_has_capacity() {
let temp_dir = tempfile::tempdir().unwrap();
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ());
assert!(!ExExManager::new(vec![], 0).handle.has_capacity());
assert!(!ExExManager::new(
vec![],
0,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream()
)
.handle
.has_capacity());
assert!(ExExManager::new(vec![exex_handle_1], 10).handle.has_capacity());
assert!(ExExManager::new(
vec![exex_handle_1],
10,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream()
)
.handle
.has_capacity());
}
#[test]
fn test_push_notification() {
let temp_dir = tempfile::tempdir().unwrap();
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
// Create a mock ExExManager and add the exex_handle to it
let mut exex_manager = ExExManager::new(vec![exex_handle], 10);
let mut exex_manager = ExExManager::new(
vec![exex_handle],
10,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
// Define the notification for testing
let mut block1 = SealedBlockWithSenders::default();
@ -898,11 +967,17 @@ mod tests {
#[test]
fn test_update_capacity() {
let temp_dir = tempfile::tempdir().unwrap();
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
// Create a mock ExExManager and add the exex_handle to it
let max_capacity = 5;
let mut exex_manager = ExExManager::new(vec![exex_handle], max_capacity);
let mut exex_manager = ExExManager::new(
vec![exex_handle],
max_capacity,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
// Push some notifications to fill part of the buffer
let mut block1 = SealedBlockWithSenders::default();
@ -932,6 +1007,7 @@ mod tests {
#[tokio::test]
async fn test_updates_block_height() {
let temp_dir = tempfile::tempdir().unwrap();
let (exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
@ -942,7 +1018,12 @@ mod tests {
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();
// Create a mock ExExManager and add the exex_handle to it
let exex_manager = ExExManager::new(vec![exex_handle], 10);
let exex_manager = ExExManager::new(
vec![exex_handle],
10,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
@ -969,6 +1050,7 @@ mod tests {
#[tokio::test]
async fn test_updates_block_height_lower() {
let temp_dir = tempfile::tempdir().unwrap();
// Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), ());
@ -979,7 +1061,12 @@ mod tests {
event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap();
event_tx2.send(ExExEvent::FinishedHeight(10)).unwrap();
let exex_manager = ExExManager::new(vec![exex_handle1, exex_handle2], 10);
let exex_manager = ExExManager::new(
vec![exex_handle1, exex_handle2],
10,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
@ -1002,6 +1089,7 @@ mod tests {
#[tokio::test]
async fn test_updates_block_height_greater() {
let temp_dir = tempfile::tempdir().unwrap();
// Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), ());
@ -1015,7 +1103,12 @@ mod tests {
event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap();
event_tx2.send(ExExEvent::FinishedHeight(100)).unwrap();
let exex_manager = ExExManager::new(vec![exex_handle1, exex_handle2], 10);
let exex_manager = ExExManager::new(
vec![exex_handle1, exex_handle2],
10,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
@ -1042,12 +1135,18 @@ mod tests {
#[tokio::test]
async fn test_exex_manager_capacity() {
let temp_dir = tempfile::tempdir().unwrap();
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ());
// Create an ExExManager with a small max capacity
let max_capacity = 2;
let mut exex_manager = ExExManager::new(vec![exex_handle_1], max_capacity);
let mut exex_manager = ExExManager::new(
vec![exex_handle_1],
max_capacity,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
@ -1223,6 +1322,44 @@ mod tests {
assert_eq!(exex_handle.next_notification_id, 23);
}
#[tokio::test]
async fn test_exex_wal_finalize() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let temp_dir = tempfile::tempdir().unwrap();
let mut wal = Wal::new(temp_dir.path()).unwrap();
let block = random_block(&mut generators::rng(), 0, Default::default())
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?;
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
};
wal.commit(&notification)?;
let (tx, rx) = watch::channel(None);
let finalized_header_stream = ForkChoiceStream::new(rx);
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
let mut exex_manager =
std::pin::pin!(ExExManager::new(vec![exex_handle], 1, wal, finalized_header_stream));
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification]
);
tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
assert!(exex_manager.wal.iter_notifications()?.next().is_none());
Ok(())
}
#[tokio::test]
async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
let mut rng = generators::rng();

View File

@ -12,7 +12,7 @@ use reth_primitives::BlockNumHash;
/// This cache is needed to avoid walking the WAL directory every time we want to find a
/// notification corresponding to a block.
#[derive(Debug)]
pub(super) struct BlockCache(BTreeMap<u64, VecDeque<CachedBlock>>);
pub struct BlockCache(BTreeMap<u64, VecDeque<CachedBlock>>);
impl BlockCache {
/// Creates a new instance of [`BlockCache`].

View File

@ -1,17 +1,17 @@
#![allow(dead_code)]
mod cache;
pub use cache::BlockCache;
mod storage;
pub use storage::Storage;
use std::path::Path;
use cache::BlockCache;
use reth_exex_types::ExExNotification;
use reth_primitives::BlockNumHash;
use reth_tracing::tracing::{debug, instrument};
use storage::Storage;
/// WAL is a write-ahead log (WAL) that stores the notifications sent to a particular ExEx.
/// WAL is a write-ahead log (WAL) that stores the notifications sent to ExExes.
///
/// WAL is backed by a directory of binary files represented by [`Storage`] and a block cache
/// represented by [`BlockCache`]. The role of the block cache is to avoid walking the WAL directory
@ -26,7 +26,7 @@ use storage::Storage;
/// 3. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the
/// WAL.
#[derive(Debug)]
pub(crate) struct Wal {
pub struct Wal {
/// The underlying WAL storage backed by a file.
storage: Storage,
/// WAL block cache. See [`cache::BlockCache`] docs for more details.
@ -35,7 +35,7 @@ pub(crate) struct Wal {
impl Wal {
/// Creates a new instance of [`Wal`].
pub(crate) fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
pub fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
let mut wal = Self { storage: Storage::new(directory)?, block_cache: BlockCache::new() };
wal.fill_block_cache()?;
Ok(wal)
@ -71,8 +71,7 @@ impl Wal {
reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
))]
pub(crate) fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> {
debug!("Writing notification to WAL");
pub fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> {
let file_id = self.block_cache.back().map_or(0, |block| block.0 + 1);
self.storage.write_notification(file_id, notification)?;
@ -94,7 +93,7 @@ impl Wal {
/// 1. The block number and hash of the lowest removed block.
/// 2. The notifications that were removed.
#[instrument(target = "exex::wal", skip(self))]
pub(crate) fn rollback(
pub fn rollback(
&mut self,
to_block: BlockNumHash,
) -> eyre::Result<Option<(BlockNumHash, Vec<ExExNotification>)>> {
@ -162,9 +161,9 @@ impl Wal {
/// 2. Removes the notifications from the beginning of WAL until the found notification. If this
/// notification includes both finalized and non-finalized blocks, it will not be removed.
#[instrument(target = "exex::wal", skip(self))]
pub(crate) fn finalize(&mut self, to_block: BlockNumHash) -> eyre::Result<()> {
pub fn finalize(&mut self, to_block: BlockNumHash) -> eyre::Result<()> {
// First, walk cache to find the file ID of the notification with the finalized block and
// save the file ID with the last unfinalized block. Do not remove any notifications
// save the file ID with the first unfinalized block. Do not remove any notifications
// yet.
let mut unfinalized_from_file_id = None;
{
@ -177,7 +176,9 @@ impl Wal {
{
let notification = self.storage.read_notification(file_id)?;
if notification.committed_chain().unwrap().blocks().len() == 1 {
unfinalized_from_file_id = block_cache.peek().map(|(file_id, _)| *file_id);
unfinalized_from_file_id = Some(
block_cache.peek().map(|(file_id, _)| *file_id).unwrap_or(u64::MAX),
);
} else {
unfinalized_from_file_id = Some(file_id);
}
@ -226,6 +227,17 @@ impl Wal {
Ok(())
}
/// Returns an iterator over all notifications in the WAL.
pub(crate) fn iter_notifications(
&self,
) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification>> + '_>> {
let Some(range) = self.storage.files_range()? else {
return Ok(Box::new(std::iter::empty()))
};
Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.1))))
}
}
#[cfg(test)]

View File

@ -15,7 +15,7 @@ use tracing::instrument;
/// Each notification is represented by a single file that contains a MessagePack-encoded
/// notification.
#[derive(Debug)]
pub(super) struct Storage {
pub struct Storage {
/// The path to the WAL file.
path: PathBuf,
}
@ -107,23 +107,25 @@ impl Storage {
}
/// Reads the notification from the file with the given id.
#[instrument(target = "exex::wal::storage", skip(self))]
pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result<ExExNotification> {
debug!(?file_id, "Reading notification from WAL");
let file_path = self.file_path(file_id);
debug!(?file_path, "Reading notification from WAL");
let mut file = File::open(&file_path)?;
read_notification(&mut file)
}
/// Writes the notification to the file with the given id.
#[instrument(target = "exex::wal::storage", skip(self, notification))]
pub(super) fn write_notification(
&self,
file_id: u64,
notification: &ExExNotification,
) -> eyre::Result<()> {
debug!(?file_id, "Writing notification to WAL");
let file_path = self.file_path(file_id);
debug!(?file_path, "Writing notification to WAL");
let mut file = File::create_new(&file_path)?;
write_notification(&mut file, notification)?;