feat(exex): subscribe to notifications with head using ExExContext (#11500)

This commit is contained in:
Alexey Shekhirin
2024-10-07 18:31:15 +03:00
committed by GitHub
parent 04fa08cd4c
commit 8a0bcbb356
3 changed files with 240 additions and 123 deletions

View File

@ -1,5 +1,6 @@
use std::fmt::Debug;
use reth_exex_types::ExExHead;
use reth_node_api::{FullNodeComponents, NodeTypes, NodeTypesWithEngine};
use reth_node_core::node_config::NodeConfig;
use reth_primitives::Head;
@ -32,7 +33,7 @@ pub struct ExExContext<Node: FullNodeComponents> {
/// considered delivered by the node.
pub notifications: ExExNotifications<Node::Provider, Node::Executor>,
/// node components
/// Node components
pub components: Node,
}
@ -92,4 +93,16 @@ impl<Node: FullNodeComponents> ExExContext<Node> {
pub fn task_executor(&self) -> &TaskExecutor {
self.components.task_executor()
}
/// Sets notifications stream to [`crate::ExExNotificationsWithoutHead`], a stream of
/// notifications without a head.
pub fn set_notifications_without_head(&mut self) {
self.notifications.set_without_head();
}
/// Sets notifications stream to [`crate::ExExNotificationsWithHead`], a stream of notifications
/// with the provided head.
pub fn set_notifications_with_head(&mut self, head: ExExHead) {
self.notifications.set_with_head(head);
}
}

View File

@ -610,12 +610,16 @@ impl Clone for ExExManagerHandle {
mod tests {
use super::*;
use alloy_primitives::B256;
use eyre::OptionExt;
use futures::{FutureExt, StreamExt};
use futures::StreamExt;
use rand::Rng;
use reth_db_common::init::init_genesis;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_primitives::SealedBlockWithSenders;
use reth_provider::{test_utils::create_test_provider_factory, BlockWriter, Chain};
use reth_testing_utils::generators::{self, random_block};
use reth_provider::{
providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader,
Chain, TransactionVariant,
};
use reth_testing_utils::generators;
fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
let (tx, rx) = watch::channel(None);
@ -975,11 +979,20 @@ mod tests {
#[tokio::test]
async fn exex_handle_new() {
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let (mut exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider,
EthExecutorProvider::mainnet(),
wal.handle(),
);
// Check initial state
assert_eq!(exex_handle.id, "test_exex");
@ -1008,7 +1021,7 @@ mod tests {
// Send a notification and ensure it's received correctly
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap();
let received_notification = notifications.next().await.unwrap().unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending => panic!("Notification send is pending"),
@ -1021,11 +1034,20 @@ mod tests {
#[tokio::test]
async fn test_notification_if_finished_height_gt_chain_tip() {
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let (mut exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider,
EthExecutorProvider::mainnet(),
wal.handle(),
);
// Set finished_height to a value higher than the block tip
exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));
@ -1046,11 +1068,7 @@ mod tests {
poll_fn(|cx| {
// The notification should be skipped, so nothing should be sent.
// Check that the receiver channel is indeed empty
assert_eq!(
notifications.poll_next_unpin(cx),
Poll::Pending,
"Receiver channel should be empty"
);
assert!(notifications.poll_next_unpin(cx).is_pending());
Poll::Ready(())
})
.await;
@ -1066,11 +1084,20 @@ mod tests {
#[tokio::test]
async fn test_sends_chain_reorged_notification() {
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let (mut exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider,
EthExecutorProvider::mainnet(),
wal.handle(),
);
let notification = ExExNotification::ChainReorged {
old: Arc::new(Chain::default()),
@ -1086,7 +1113,7 @@ mod tests {
// Send the notification
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap();
let received_notification = notifications.next().await.unwrap().unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending | Poll::Ready(Err(_)) => {
@ -1100,11 +1127,20 @@ mod tests {
#[tokio::test]
async fn test_sends_chain_reverted_notification() {
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let (mut exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider,
EthExecutorProvider::mainnet(),
wal.handle(),
);
let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
@ -1117,7 +1153,7 @@ mod tests {
// Send the notification
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap();
let received_notification = notifications.next().await.unwrap().unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending | Poll::Ready(Err(_)) => {
@ -1135,30 +1171,34 @@ mod tests {
let mut rng = generators::rng();
let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory).unwrap();
let genesis_block = provider_factory
.sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
.unwrap()
.ok_or_else(|| eyre::eyre!("genesis block not found"))?;
let provider = BlockchainProvider2::new(provider_factory).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
let block = random_block(&mut rng, 0, Default::default())
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?;
let provider_rw = provider_factory.provider_rw()?;
provider_rw.insert_block(block.clone())?;
provider_rw.commit()?;
let (exex_handle, events_tx, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider.clone(),
EthExecutorProvider::mainnet(),
wal.handle(),
);
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)),
};
let (finalized_headers_tx, rx) = watch::channel(None);
let finalized_header_stream = ForkChoiceStream::new(rx);
let (exex_handle, events_tx, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let mut exex_manager = std::pin::pin!(ExExManager::new(
provider_factory,
provider,
vec![exex_handle],
1,
wal,
@ -1170,16 +1210,13 @@ mod tests {
exex_manager.handle().send(notification.clone())?;
assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
assert_eq!(
notifications.next().poll_unpin(&mut cx),
Poll::Ready(Some(notification.clone()))
);
assert_eq!(notifications.next().await.unwrap().unwrap(), notification.clone());
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification.clone()]
);
finalized_headers_tx.send(Some(block.header.clone()))?;
finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
assert_eq!(
@ -1192,7 +1229,7 @@ mod tests {
.send(ExExEvent::FinishedHeight((rng.gen::<u64>(), rng.gen::<B256>()).into()))
.unwrap();
finalized_headers_tx.send(Some(block.header.clone()))?;
finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
// non-canonical block
@ -1202,9 +1239,9 @@ mod tests {
);
// Send a `FinishedHeight` event with a canonical block
events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
events_tx.send(ExExEvent::FinishedHeight(genesis_block.num_hash())).unwrap();
finalized_headers_tx.send(Some(block.header.clone()))?;
finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL is finalized
assert!(exex_manager.wal.iter_notifications()?.next().is_none());

View File

@ -13,8 +13,129 @@ use std::{
};
use tokio::sync::mpsc::Receiver;
/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. If the
/// stream is configured with a head via [`ExExNotifications::set_with_head`] or
/// [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
#[derive(Debug)]
pub struct ExExNotifications<P, E> {
inner: ExExNotificationsInner<P, E>,
}
#[derive(Debug)]
enum ExExNotificationsInner<P, E> {
/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
WithoutHead(ExExNotificationsWithoutHead<P, E>),
/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that
/// are committed or reverted after the given head.
WithHead(ExExNotificationsWithHead<P, E>),
/// Internal state used when transitioning between [`ExExNotificationsInner::WithoutHead`] and
/// [`ExExNotificationsInner::WithHead`].
Invalid,
}
impl<P, E> ExExNotifications<P, E> {
/// Creates a new stream of [`ExExNotifications`] without a head.
pub const fn new(
node_head: Head,
provider: P,
executor: E,
notifications: Receiver<ExExNotification>,
wal_handle: WalHandle,
) -> Self {
Self {
inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
node_head,
provider,
executor,
notifications,
wal_handle,
)),
}
}
/// Sets [`ExExNotifications`] to a stream of [`ExExNotification`]s without a head.
///
/// It's a no-op if the stream has already been configured without a head.
///
/// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
pub fn set_without_head(&mut self) {
let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
self.inner = ExExNotificationsInner::WithoutHead(match current {
ExExNotificationsInner::WithoutHead(notifications) => notifications,
ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
notifications.node_head,
notifications.provider,
notifications.executor,
notifications.notifications,
notifications.wal_handle,
),
ExExNotificationsInner::Invalid => unreachable!(),
});
}
/// Returns a new [`ExExNotifications`] without a head.
///
/// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
pub fn without_head(mut self) -> Self {
self.set_without_head();
self
}
/// Sets [`ExExNotifications`] to a stream of [`ExExNotification`]s with the provided head.
///
/// It's a no-op if the stream has already been configured with a head.
///
/// See the documentation of [`ExExNotificationsWithHead`] for more details.
pub fn set_with_head(&mut self, exex_head: ExExHead) {
let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
self.inner = ExExNotificationsInner::WithHead(match current {
ExExNotificationsInner::WithoutHead(notifications) => {
notifications.with_head(exex_head)
}
ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithHead::new(
notifications.node_head,
notifications.provider,
notifications.executor,
notifications.notifications,
notifications.wal_handle,
exex_head,
),
ExExNotificationsInner::Invalid => unreachable!(),
});
}
/// Returns a new [`ExExNotifications`] with the provided head.
///
/// See the documentation of [`ExExNotificationsWithHead`] for more details.
pub fn with_head(mut self, exex_head: ExExHead) -> Self {
self.set_with_head(exex_head);
self
}
}
impl<P, E> Stream for ExExNotifications<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
E: BlockExecutorProvider + Clone + Unpin + 'static,
{
type Item = eyre::Result<ExExNotification>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match &mut self.get_mut().inner {
ExExNotificationsInner::WithoutHead(notifications) => {
notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
}
ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
ExExNotificationsInner::Invalid => unreachable!(),
}
}
}
/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
pub struct ExExNotificationsWithoutHead<P, E> {
node_head: Head,
provider: P,
executor: E,
@ -22,7 +143,7 @@ pub struct ExExNotifications<P, E> {
wal_handle: WalHandle,
}
impl<P: Debug, E: Debug> Debug for ExExNotifications<P, E> {
impl<P: Debug, E: Debug> Debug for ExExNotificationsWithoutHead<P, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExExNotifications")
.field("provider", &self.provider)
@ -32,9 +153,9 @@ impl<P: Debug, E: Debug> Debug for ExExNotifications<P, E> {
}
}
impl<P, E> ExExNotifications<P, E> {
/// Creates a new instance of [`ExExNotifications`].
pub const fn new(
impl<P, E> ExExNotificationsWithoutHead<P, E> {
/// Creates a new instance of [`ExExNotificationsWithoutHead`].
const fn new(
node_head: Head,
provider: P,
executor: E,
@ -44,70 +165,8 @@ impl<P, E> ExExNotifications<P, E> {
Self { node_head, provider, executor, notifications, wal_handle }
}
/// Receives the next value for this receiver.
///
/// This method returns `None` if the channel has been closed and there are
/// no remaining messages in the channel's buffer. This indicates that no
/// further values can ever be received from this `Receiver`. The channel is
/// closed when all senders have been dropped, or when [`Receiver::close`] is called.
///
/// # Cancel safety
///
/// This method is cancel safe. If `recv` is used as the event in a
/// [`tokio::select!`] statement and some other branch
/// completes first, it is guaranteed that no messages were received on this
/// channel.
///
/// For full documentation, see [`Receiver::recv`].
#[deprecated(note = "use `ExExNotifications::next` and its `Stream` implementation instead")]
pub async fn recv(&mut self) -> Option<ExExNotification> {
self.notifications.recv().await
}
/// Polls to receive the next message on this channel.
///
/// This method returns:
///
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
/// spurious failure happens.
/// * `Poll::Ready(Some(message))` if a message is available.
/// * `Poll::Ready(None)` if the channel has been closed and all messages sent before it was
/// closed have been received.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
/// passed to the most recent call is scheduled to receive a wakeup.
///
/// If this method returns `Poll::Pending` due to a spurious failure, then
/// the `Waker` will be notified when the situation causing the spurious
/// failure has been resolved. Note that receiving such a wakeup does not
/// guarantee that the next call will succeed — it could fail with another
/// spurious failure.
///
/// For full documentation, see [`Receiver::poll_recv`].
#[deprecated(
note = "use `ExExNotifications::poll_next` and its `Stream` implementation instead"
)]
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<ExExNotification>> {
self.notifications.poll_recv(cx)
}
}
impl<P, E> ExExNotifications<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
E: BlockExecutorProvider + Clone + Unpin + 'static,
{
/// Subscribe to notifications with the given head. This head is the ExEx's
/// latest view of the host chain.
///
/// Notifications will be sent starting from the head, not inclusive. For
/// example, if `head.number == 10`, then the first notification will be
/// with `block.number == 11`. A `head.number` of 10 indicates that the ExEx
/// has processed up to block 10, and is ready to process block 11.
pub fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
/// Subscribe to notifications with the given head.
fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
ExExNotificationsWithHead::new(
self.node_head,
self.provider,
@ -119,7 +178,7 @@ where
}
}
impl<P: Unpin, E: Unpin> Stream for ExExNotifications<P, E> {
impl<P: Unpin, E: Unpin> Stream for ExExNotificationsWithoutHead<P, E> {
type Item = ExExNotification;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -128,7 +187,13 @@ impl<P: Unpin, E: Unpin> Stream for ExExNotifications<P, E> {
}
/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are
/// committed or reverted after the given head.
/// committed or reverted after the given head. The head is the ExEx's latest view of the host
/// chain.
///
/// Notifications will be sent starting from the head, not inclusive. For example, if
/// `exex_head.number == 10`, then the first notification will be with `block.number == 11`. An
/// `exex_head.number` of 10 indicates that the ExEx has processed up to block 10, and is ready to
/// process block 11.
#[derive(Debug)]
pub struct ExExNotificationsWithHead<P, E> {
node_head: Head,
@ -147,13 +212,9 @@ pub struct ExExNotificationsWithHead<P, E> {
backfill_job: Option<StreamBackfillJob<E, P, Chain>>,
}
impl<P, E> ExExNotificationsWithHead<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
E: BlockExecutorProvider + Clone + Unpin + 'static,
{
impl<P, E> ExExNotificationsWithHead<P, E> {
/// Creates a new [`ExExNotificationsWithHead`].
pub const fn new(
const fn new(
node_head: Head,
provider: P,
executor: E,
@ -173,7 +234,13 @@ where
backfill_job: None,
}
}
}
impl<P, E> ExExNotificationsWithHead<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
E: BlockExecutorProvider + Clone + Unpin + 'static,
{
/// Checks if the ExEx head is on the canonical chain.
///
/// If the head block is not found in the database or it's ahead of the node head, it means
@ -367,7 +434,7 @@ mod tests {
notifications_tx.send(notification.clone()).await?;
let mut notifications = ExExNotifications::new(
let mut notifications = ExExNotificationsWithoutHead::new(
node_head,
provider,
EthExecutorProvider::mainnet(),
@ -438,7 +505,7 @@ mod tests {
notifications_tx.send(notification.clone()).await?;
let mut notifications = ExExNotifications::new(
let mut notifications = ExExNotificationsWithoutHead::new(
node_head,
provider,
EthExecutorProvider::mainnet(),
@ -528,7 +595,7 @@ mod tests {
notifications_tx.send(new_notification.clone()).await?;
let mut notifications = ExExNotifications::new(
let mut notifications = ExExNotificationsWithoutHead::new(
node_head,
provider,
EthExecutorProvider::mainnet(),
@ -609,7 +676,7 @@ mod tests {
notifications_tx.send(new_notification.clone()).await?;
let mut notifications = ExExNotifications::new(
let mut notifications = ExExNotificationsWithoutHead::new(
node_head,
provider,
EthExecutorProvider::mainnet(),