feat(exex): commit only notifications with unfinalized blocks to WAL (#11638)

This commit is contained in:
Alexey Shekhirin
2024-10-10 14:13:14 +01:00
committed by GitHub
parent 8a11830272
commit 90cb3629a5
3 changed files with 108 additions and 37 deletions

View File

@ -34,6 +34,18 @@ use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
/// or 17 minutes of 1-second blocks.
pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
/// The source of the notification.
///
/// This distinguishment is needed to not commit any pipeline notificatations to [WAL](`Wal`),
/// because they are already finalized.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExExNotificationSource {
/// The notification was sent from the pipeline.
Pipeline,
/// The notification was sent from the blockchain tree.
BlockchainTree,
}
/// Metrics for an `ExEx`.
#[derive(Metrics)]
#[metrics(scope = "exex")]
@ -197,7 +209,7 @@ pub struct ExExManager<P> {
exex_handles: Vec<ExExHandle>,
/// [`ExExNotification`] channel from the [`ExExManagerHandle`]s.
handle_rx: UnboundedReceiver<ExExNotification>,
handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification)>,
/// The minimum notification ID currently present in the buffer.
min_id: usize,
@ -429,14 +441,23 @@ where
// Drain handle notifications
while this.buffer.len() < this.max_capacity {
if let Poll::Ready(Some(notification)) = this.handle_rx.poll_recv(cx) {
debug!(
target: "exex::manager",
committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number),
reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number),
"Received new notification"
);
this.wal.commit(&notification)?;
if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
let committed_tip = notification.committed_chain().map(|chain| chain.tip().number);
let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number);
debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");
// Commit to WAL only notifications from blockchain tree. Pipeline notifications
// always contain only finalized blocks.
match source {
ExExNotificationSource::BlockchainTree => {
debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL");
this.wal.commit(&notification)?;
}
ExExNotificationSource::Pipeline => {
debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit");
}
}
this.push_notification(notification);
continue
}
@ -491,7 +512,7 @@ where
#[derive(Debug)]
pub struct ExExManagerHandle {
/// Channel to send notifications to the `ExEx` manager.
exex_tx: UnboundedSender<ExExNotification>,
exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification)>,
/// The number of `ExEx`'s running on the node.
num_exexs: usize,
/// A watch channel denoting whether the manager is ready for new notifications or not.
@ -533,8 +554,12 @@ impl ExExManagerHandle {
/// Synchronously send a notification over the channel to all execution extensions.
///
/// Senders should call [`Self::has_capacity`] first.
pub fn send(&self, notification: ExExNotification) -> Result<(), SendError<ExExNotification>> {
self.exex_tx.send(notification)
pub fn send(
&self,
source: ExExNotificationSource,
notification: ExExNotification,
) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> {
self.exex_tx.send((source, notification))
}
/// Asynchronously send a notification over the channel to all execution extensions.
@ -543,10 +568,11 @@ impl ExExManagerHandle {
/// capacity in the channel, the future will wait.
pub async fn send_async(
&mut self,
source: ExExNotificationSource,
notification: ExExNotification,
) -> Result<(), SendError<ExExNotification>> {
) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> {
self.ready().await;
self.exex_tx.send(notification)
self.exex_tx.send((source, notification))
}
/// Get the current capacity of the `ExEx` manager's internal notification buffer.
@ -610,16 +636,16 @@ impl Clone for ExExManagerHandle {
mod tests {
use super::*;
use alloy_primitives::B256;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use rand::Rng;
use reth_db_common::init::init_genesis;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_primitives::SealedBlockWithSenders;
use reth_provider::{
providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader,
Chain, TransactionVariant,
BlockWriter, Chain, DatabaseProviderFactory, TransactionVariant,
};
use reth_testing_utils::generators;
use reth_testing_utils::generators::{self, random_block, BlockParams};
fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
let (tx, rx) = watch::channel(None);
@ -959,9 +985,21 @@ mod tests {
};
// Send notifications to go over the max capacity
exex_manager.handle.exex_tx.send(notification.clone()).unwrap();
exex_manager.handle.exex_tx.send(notification.clone()).unwrap();
exex_manager.handle.exex_tx.send(notification).unwrap();
exex_manager
.handle
.exex_tx
.send((ExExNotificationSource::BlockchainTree, notification.clone()))
.unwrap();
exex_manager
.handle
.exex_tx
.send((ExExNotificationSource::BlockchainTree, notification.clone()))
.unwrap();
exex_manager
.handle
.exex_tx
.send((ExExNotificationSource::BlockchainTree, notification))
.unwrap();
// Pin the ExExManager to call the poll method
let mut pinned_manager = std::pin::pin!(exex_manager);
@ -1177,6 +1215,18 @@ mod tests {
.sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
.unwrap()
.ok_or_else(|| eyre::eyre!("genesis block not found"))?;
let block = random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), ..Default::default() },
)
.seal_with_senders()
.unwrap();
let provider_rw = provider_factory.database_provider_rw().unwrap();
provider_rw.insert_block(block.clone()).unwrap();
provider_rw.commit().unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
@ -1190,33 +1240,49 @@ mod tests {
wal.handle(),
);
let notification = ExExNotification::ChainCommitted {
let genesis_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)),
};
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
};
let (finalized_headers_tx, rx) = watch::channel(None);
finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
let finalized_header_stream = ForkChoiceStream::new(rx);
let mut exex_manager = std::pin::pin!(ExExManager::new(
provider,
vec![exex_handle],
1,
2,
wal,
finalized_header_stream
));
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
exex_manager.handle().send(notification.clone())?;
exex_manager
.handle()
.send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
assert_eq!(notifications.next().await.unwrap().unwrap(), notification.clone());
assert_eq!(
notifications.try_poll_next_unpin(&mut cx)?,
Poll::Ready(Some(genesis_notification))
);
assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
assert_eq!(
notifications.try_poll_next_unpin(&mut cx)?,
Poll::Ready(Some(notification.clone()))
);
// WAL shouldn't contain the genesis notification, because it's finalized
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification.clone()]
);
finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
assert_eq!(
@ -1229,7 +1295,7 @@ mod tests {
.send(ExExEvent::FinishedHeight((rng.gen::<u64>(), rng.gen::<B256>()).into()))
.unwrap();
finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
// non-canonical block
@ -1239,12 +1305,12 @@ mod tests {
);
// Send a `FinishedHeight` event with a canonical block
events_tx.send(ExExEvent::FinishedHeight(genesis_block.num_hash())).unwrap();
events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL is finalized
assert!(exex_manager.wal.iter_notifications()?.next().is_none());
assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
Ok(())
}

View File

@ -6,7 +6,8 @@ use futures::future;
use reth_chain_state::ForkChoiceSubscriptions;
use reth_chainspec::EthChainSpec;
use reth_exex::{
ExExContext, ExExHandle, ExExManager, ExExManagerHandle, Wal, DEFAULT_EXEX_MANAGER_CAPACITY,
ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal,
DEFAULT_EXEX_MANAGER_CAPACITY,
};
use reth_node_api::{FullNodeComponents, NodeTypes};
use reth_primitives::Head;
@ -47,6 +48,7 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
return Ok(None)
}
info!(target: "reth::cli", "Loading ExEx Write-Ahead Log...");
let exex_wal = Wal::new(
config_container
.config
@ -127,7 +129,7 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
async move {
while let Ok(notification) = canon_state_notifications.recv().await {
handle
.send_async(notification.into())
.send_async(ExExNotificationSource::BlockchainTree, notification.into())
.await
.expect("blockchain tree notification could not be sent to exex manager");
}

View File

@ -9,7 +9,7 @@ use reth_evm::{
metrics::ExecutorMetrics,
};
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_exex::{ExExManagerHandle, ExExNotification};
use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource};
use reth_primitives::{Header, SealedHeader, StaticFileSegment};
use reth_primitives_traits::format_gas_throughput;
use reth_provider::{
@ -389,9 +389,10 @@ where
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self
.exex_manager_handle
.send(ExExNotification::ChainCommitted { new: Arc::new(chain) });
let _ = self.exex_manager_handle.send(
ExExNotificationSource::Pipeline,
ExExNotification::ChainCommitted { new: Arc::new(chain) },
);
Ok(())
}
@ -477,8 +478,10 @@ where
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ =
self.exex_manager_handle.send(ExExNotification::ChainReverted { old: Arc::new(chain) });
let _ = self.exex_manager_handle.send(
ExExNotificationSource::Pipeline,
ExExNotification::ChainReverted { old: Arc::new(chain) },
);
Ok(())
}