feat(exex): finalize WAL only when all ExExes are on the canonical chain (#11289)

This commit is contained in:
Alexey Shekhirin
2024-09-29 13:29:12 +03:00
committed by GitHub
parent d94462bfef
commit ce1f669b9b
6 changed files with 227 additions and 88 deletions

View File

@ -44,6 +44,7 @@ tokio.workspace = true
## misc
dashmap.workspace = true
eyre.workspace = true
itertools.workspace = true
metrics.workspace = true
parking_lot.workspace = true
serde_json.workspace = true
@ -62,6 +63,7 @@ reth-testing-utils.workspace = true
alloy-genesis.workspace = true
alloy-consensus.workspace = true
rand.workspace = true
secp256k1.workspace = true
tempfile.workspace = true

View File

@ -2,16 +2,19 @@ use crate::{
wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
};
use futures::StreamExt;
use itertools::Itertools;
use metrics::Gauge;
use reth_chain_state::ForkChoiceStream;
use reth_chainspec::Head;
use reth_metrics::{metrics::Counter, Metrics};
use reth_primitives::{BlockNumHash, SealedHeader};
use reth_provider::HeaderProvider;
use reth_tracing::tracing::debug;
use std::{
collections::VecDeque,
fmt::Debug,
future::{poll_fn, Future},
ops::Not,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
@ -183,7 +186,10 @@ pub struct ExExManagerMetrics {
/// - Error handling
/// - Monitoring
#[derive(Debug)]
pub struct ExExManager {
pub struct ExExManager<P> {
/// Provider for querying headers.
provider: P,
/// Handles to communicate with the `ExEx`'s.
exex_handles: Vec<ExExHandle>,
@ -223,7 +229,7 @@ pub struct ExExManager {
metrics: ExExManagerMetrics,
}
impl ExExManager {
impl<P> ExExManager<P> {
/// Create a new [`ExExManager`].
///
/// You must provide an [`ExExHandle`] for each `ExEx` and the maximum capacity of the
@ -232,6 +238,7 @@ impl ExExManager {
/// When the capacity is exceeded (which can happen if an `ExEx` is slow) no one can send
/// notifications over [`ExExManagerHandle`]s until there is capacity again.
pub fn new(
provider: P,
handles: Vec<ExExHandle>,
max_capacity: usize,
wal: Wal,
@ -254,6 +261,8 @@ impl ExExManager {
metrics.num_exexs.set(num_exexs as f64);
Self {
provider,
exex_handles: handles,
handle_rx,
@ -309,70 +318,84 @@ impl ExExManager {
}
}
impl Future for ExExManager {
impl<P> ExExManager<P>
where
P: HeaderProvider,
{
/// Finalizes the WAL according to the passed finalized header.
///
/// This function checks if all ExExes are on the canonical chain and finalizes the WAL if
/// necessary.
fn finalize_wal(&self, finalized_header: SealedHeader) -> eyre::Result<()> {
debug!(header = ?finalized_header.num_hash(), "Received finalized header");
// Check if all ExExes are on the canonical chain
let exex_finished_heights = self
.exex_handles
.iter()
// Get ExEx ID and hash of the finished height for each ExEx
.map(|exex_handle| {
(&exex_handle.id, exex_handle.finished_height.map(|block| block.hash))
})
// Deduplicate all hashes
.unique_by(|(_, hash)| *hash)
// Check if hashes are canonical
.map(|(exex_id, hash)| {
hash.map_or(Ok((exex_id, hash, false)), |hash| {
self.provider
.is_known(&hash)
// Save the ExEx ID, hash of the finished height, and whether the hash
// is canonical
.map(|is_canonical| (exex_id, Some(hash), is_canonical))
})
})
// We collect here to be able to log the unfinalized ExExes below
.collect::<Result<Vec<_>, _>>()?;
if exex_finished_heights.iter().all(|(_, _, is_canonical)| *is_canonical) {
// If there is a finalized header and all ExExs are on the canonical chain, finalize
// the WAL with the new finalized header
self.wal.finalize(finalized_header.num_hash())?;
} else {
let unfinalized_exexes = exex_finished_heights
.into_iter()
.filter_map(|(exex_id, hash, is_canonical)| {
is_canonical.not().then_some((exex_id, hash))
})
.format_with(", ", |(exex_id, hash), f| f(&format_args!("{exex_id:?} = {hash:?}")));
debug!(
%unfinalized_exexes,
"Not all ExExes are on the canonical chain, can't finalize the WAL"
);
}
Ok(())
}
}
impl<P> Future for ExExManager<P>
where
P: HeaderProvider + Unpin + 'static,
{
type Output = eyre::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Drain the finalized header stream and grab the last finalized header
let mut last_finalized_header = None;
while let Poll::Ready(finalized_header) = self.finalized_header_stream.poll_next_unpin(cx) {
last_finalized_header = finalized_header;
}
// If there is a finalized header, finalize the WAL with it
if let Some(header) = last_finalized_header {
self.wal.finalize((header.number, header.hash()).into())?;
}
/// Main loop of the [`ExExManager`]. The order of operations is as follows:
/// 1. Handle incoming ExEx events. We do it before finalizing the WAL, because it depends on
/// the latest state of [`ExExEvent::FinishedHeight`] events.
/// 2. Finalize the WAL with the finalized header, if necessary.
/// 3. Drain [`ExExManagerHandle`] notifications, push them to the internal buffer and update
/// the internal buffer capacity.
/// 5. Send notifications from the internal buffer to those ExExes that are ready to receive new
/// notifications.
/// 5. Remove notifications from the internal buffer that have been sent to **all** ExExes and
/// update the internal buffer capacity.
/// 6. Update the channel with the lowest [`FinishedExExHeight`] among all ExExes.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// drain handle notifications
while self.buffer.len() < self.max_capacity {
if let Poll::Ready(Some(notification)) = self.handle_rx.poll_recv(cx) {
debug!(
committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number),
reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number),
"Received new notification"
);
self.push_notification(notification);
continue
}
break
}
// update capacity
self.update_capacity();
// advance all poll senders
let mut min_id = usize::MAX;
for idx in (0..self.exex_handles.len()).rev() {
let mut exex = self.exex_handles.swap_remove(idx);
// it is a logic error for this to ever underflow since the manager manages the
// notification IDs
let notification_index = exex
.next_notification_id
.checked_sub(self.min_id)
.expect("exex expected notification ID outside the manager's range");
if let Some(notification) = self.buffer.get(notification_index) {
if let Poll::Ready(Err(err)) = exex.send(cx, notification) {
// the channel was closed, which is irrecoverable for the manager
return Poll::Ready(Err(err.into()))
}
}
min_id = min_id.min(exex.next_notification_id);
self.exex_handles.push(exex);
}
// remove processed buffered notifications
debug!(%min_id, "Updating lowest notification id in buffer");
self.buffer.retain(|&(id, _)| id >= min_id);
self.min_id = min_id;
// update capacity
self.update_capacity();
// handle incoming exex events
for exex in &mut self.exex_handles {
// Handle incoming ExEx events
for exex in &mut this.exex_handles {
while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
debug!(exex_id = %exex.id, ?event, "Received event from exex");
debug!(exex_id = %exex.id, ?event, "Received event from ExEx");
exex.metrics.events_sent_total.increment(1);
match event {
ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
@ -380,12 +403,67 @@ impl Future for ExExManager {
}
}
// update watch channel block number
let finished_height = self.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
// Drain the finalized header stream and finalize the WAL with the last header
let mut last_finalized_header = None;
while let Poll::Ready(finalized_header) = this.finalized_header_stream.poll_next_unpin(cx) {
last_finalized_header = finalized_header;
}
if let Some(header) = last_finalized_header {
this.finalize_wal(header)?;
}
// Drain handle notifications
while this.buffer.len() < this.max_capacity {
if let Poll::Ready(Some(notification)) = this.handle_rx.poll_recv(cx) {
debug!(
committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number),
reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number),
"Received new notification"
);
this.push_notification(notification);
continue
}
break
}
// Update capacity
this.update_capacity();
// Advance all poll senders
let mut min_id = usize::MAX;
for idx in (0..this.exex_handles.len()).rev() {
let mut exex = this.exex_handles.swap_remove(idx);
// It is a logic error for this to ever underflow since the manager manages the
// notification IDs
let notification_index = exex
.next_notification_id
.checked_sub(this.min_id)
.expect("exex expected notification ID outside the manager's range");
if let Some(notification) = this.buffer.get(notification_index) {
if let Poll::Ready(Err(err)) = exex.send(cx, notification) {
// The channel was closed, which is irrecoverable for the manager
return Poll::Ready(Err(err.into()))
}
}
min_id = min_id.min(exex.next_notification_id);
this.exex_handles.push(exex);
}
// Remove processed buffered notifications
debug!(%min_id, "Updating lowest notification id in buffer");
this.buffer.retain(|&(id, _)| id >= min_id);
this.min_id = min_id;
// Update capacity
this.update_capacity();
// Update watch channel block number
let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
});
if let Ok(finished_height) = finished_height {
let _ = self.finished_height.send(FinishedExExHeight::Height(finished_height));
let _ = this.finished_height.send(FinishedExExHeight::Height(finished_height));
}
Poll::Pending
@ -517,8 +595,9 @@ mod tests {
use alloy_primitives::B256;
use eyre::OptionExt;
use futures::StreamExt;
use rand::Rng;
use reth_primitives::SealedBlockWithSenders;
use reth_provider::Chain;
use reth_provider::{test_utils::create_test_provider_factory, BlockWriter, Chain};
use reth_testing_utils::generators::{self, random_block};
fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
@ -551,11 +630,11 @@ mod tests {
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
assert!(!ExExManager::new(vec![], 0, wal.clone(), empty_finalized_header_stream())
assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
.handle
.has_exexs());
assert!(ExExManager::new(vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
assert!(ExExManager::new((), vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
.handle
.has_exexs());
}
@ -568,13 +647,19 @@ mod tests {
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
assert!(!ExExManager::new(vec![], 0, wal.clone(), empty_finalized_header_stream())
assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
.handle
.has_capacity());
assert!(ExExManager::new(vec![exex_handle_1], 10, wal, empty_finalized_header_stream())
.handle
.has_capacity());
assert!(ExExManager::new(
(),
vec![exex_handle_1],
10,
wal,
empty_finalized_header_stream()
)
.handle
.has_capacity());
}
#[test]
@ -587,7 +672,7 @@ mod tests {
// Create a mock ExExManager and add the exex_handle to it
let mut exex_manager =
ExExManager::new(vec![exex_handle], 10, wal, empty_finalized_header_stream());
ExExManager::new((), vec![exex_handle], 10, wal, empty_finalized_header_stream());
// Define the notification for testing
let mut block1 = SealedBlockWithSenders::default();
@ -637,8 +722,13 @@ mod tests {
// Create a mock ExExManager and add the exex_handle to it
let max_capacity = 5;
let mut exex_manager =
ExExManager::new(vec![exex_handle], max_capacity, wal, empty_finalized_header_stream());
let mut exex_manager = ExExManager::new(
(),
vec![exex_handle],
max_capacity,
wal,
empty_finalized_header_stream(),
);
// Push some notifications to fill part of the buffer
let mut block1 = SealedBlockWithSenders::default();
@ -671,6 +761,8 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
let (exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
@ -683,6 +775,7 @@ mod tests {
// Create a mock ExExManager and add the exex_handle to it
let exex_manager = ExExManager::new(
provider_factory,
vec![exex_handle],
10,
Wal::new(temp_dir.path()).unwrap(),
@ -717,6 +810,8 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
// Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle());
@ -731,6 +826,7 @@ mod tests {
event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
let exex_manager = ExExManager::new(
provider_factory,
vec![exex_handle1, exex_handle2],
10,
Wal::new(temp_dir.path()).unwrap(),
@ -761,6 +857,8 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
// Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle());
@ -778,6 +876,7 @@ mod tests {
event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
let exex_manager = ExExManager::new(
provider_factory,
vec![exex_handle1, exex_handle2],
10,
Wal::new(temp_dir.path()).unwrap(),
@ -812,12 +911,15 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
// Create an ExExManager with a small max capacity
let max_capacity = 2;
let mut exex_manager = ExExManager::new(
provider_factory,
vec![exex_handle_1],
max_capacity,
Wal::new(temp_dir.path()).unwrap(),
@ -1014,37 +1116,75 @@ mod tests {
async fn test_exex_wal_finalize() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let mut rng = generators::rng();
let temp_dir = tempfile::tempdir().unwrap();
let mut wal = Wal::new(temp_dir.path()).unwrap();
let block = random_block(&mut generators::rng(), 0, Default::default())
let provider_factory = create_test_provider_factory();
let block = random_block(&mut rng, 0, Default::default())
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?;
let provider_rw = provider_factory.provider_rw()?;
provider_rw.insert_block(block.clone())?;
provider_rw.commit()?;
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
};
wal.commit(&notification)?;
let (tx, rx) = watch::channel(None);
let (finalized_headers_tx, rx) = watch::channel(None);
let finalized_header_stream = ForkChoiceStream::new(rx);
let (exex_handle, _, _) =
let (exex_handle, events_tx, _) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let mut exex_manager =
std::pin::pin!(ExExManager::new(vec![exex_handle], 1, wal, finalized_header_stream));
let mut exex_manager = std::pin::pin!(ExExManager::new(
provider_factory,
vec![exex_handle],
1,
wal,
finalized_header_stream
));
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification.clone()]
);
finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification.clone()]
);
// Send a `FinishedHeight` event with a non-canonical block
events_tx
.send(ExExEvent::FinishedHeight((rng.gen::<u64>(), rng.gen::<B256>()).into()))
.unwrap();
finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
// non-canonical block
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification]
);
tx.send(Some(block.header.clone()))?;
// Send a `FinishedHeight` event with a canonical block
events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL is finalized
assert!(exex_manager.wal.iter_notifications()?.next().is_none());
Ok(())

View File

@ -179,7 +179,7 @@ where
/// If the head block is not found in the database, it means we're not on the canonical chain
/// and we need to revert the notification with the ExEx head block.
fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification>> {
if self.provider.header(&self.exex_head.block.hash)?.is_some() {
if self.provider.is_known(&self.exex_head.block.hash)? {
debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
return Ok(None)
}

View File

@ -117,12 +117,6 @@ impl WalInner {
Ok(())
}
/// Finalizes the WAL to the given block, inclusive.
///
/// 1. Finds a notification with first unfinalized block (first notification containing a
/// committed block higher than `to_block`).
/// 2. Removes the notifications from the beginning of WAL until the found notification. If this
/// notification includes both finalized and non-finalized blocks, it will not be removed.
#[instrument(target = "exex::wal", skip(self))]
fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
// First, walk cache to find the file ID of the notification with the finalized block and