feat(exex): backfill on subscription with head (#10787)

This commit is contained in:
Alexey Shekhirin
2024-09-17 13:32:41 +01:00
committed by GitHub
parent 346eb308cd
commit 0cde072453
8 changed files with 507 additions and 56 deletions

View File

@ -13,6 +13,7 @@ workspace = true
[dependencies]
## reth
reth-chainspec.workspace = true
reth-config.workspace = true
reth-evm.workspace = true
reth-exex-types.workspace = true
@ -40,7 +41,6 @@ metrics.workspace = true
[dev-dependencies]
reth-blockchain-tree.workspace = true
reth-chainspec.workspace = true
reth-db-api.workspace = true
reth-db-common.workspace = true
reth-evm-ethereum.workspace = true

View File

@ -30,13 +30,18 @@ pub struct ExExContext<Node: FullNodeComponents> {
///
/// Once an [`ExExNotification`](crate::ExExNotification) is sent over the channel, it is
/// considered delivered by the node.
pub notifications: ExExNotifications<Node>,
pub notifications: ExExNotifications<Node::Provider, Node::Executor>,
/// node components
pub components: Node,
}
impl<Node: FullNodeComponents> Debug for ExExContext<Node> {
impl<Node> Debug for ExExContext<Node>
where
Node: FullNodeComponents,
Node::Provider: Debug,
Node::Executor: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExExContext")
.field("head", &self.head)

View File

@ -1,9 +1,15 @@
use crate::{ExExEvent, ExExNotification, FinishedExExHeight};
use futures::Stream;
use crate::{
BackfillJobFactory, ExExEvent, ExExNotification, FinishedExExHeight, StreamBackfillJob,
};
use eyre::OptionExt;
use futures::{Stream, StreamExt};
use metrics::Gauge;
use reth_chainspec::Head;
use reth_evm::execute::BlockExecutorProvider;
use reth_exex_types::ExExHead;
use reth_metrics::{metrics::Counter, Metrics};
use reth_primitives::BlockNumber;
use reth_primitives::{BlockNumber, U256};
use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
use reth_tracing::tracing::debug;
use std::{
collections::VecDeque,
@ -60,13 +66,16 @@ impl ExExHandle {
///
/// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a
/// [`Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`.
pub fn new<Node>(
pub fn new<P, E>(
id: String,
components: Node,
) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<Node>) {
node_head: Head,
provider: P,
executor: E,
) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
let (notification_tx, notification_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::unbounded_channel();
let notifications = ExExNotifications { components, notifications: notification_rx };
let notifications =
ExExNotifications { node_head, provider, executor, notifications: notification_rx };
(
Self {
@ -145,24 +154,32 @@ impl ExExHandle {
}
/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
pub struct ExExNotifications<Node> {
components: Node,
pub struct ExExNotifications<P, E> {
node_head: Head,
provider: P,
executor: E,
notifications: Receiver<ExExNotification>,
}
impl<Node> Debug for ExExNotifications<Node> {
impl<P: Debug, E: Debug> Debug for ExExNotifications<P, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExExNotifications")
.field("components", &"...")
.field("provider", &self.provider)
.field("executor", &self.executor)
.field("notifications", &self.notifications)
.finish()
}
}
impl<Node> ExExNotifications<Node> {
impl<P, E> ExExNotifications<P, E> {
/// Creates a new instance of [`ExExNotifications`].
pub const fn new(components: Node, notifications: Receiver<ExExNotification>) -> Self {
Self { components, notifications }
pub const fn new(
node_head: Head,
provider: P,
executor: E,
notifications: Receiver<ExExNotification>,
) -> Self {
Self { node_head, provider, executor, notifications }
}
/// Receives the next value for this receiver.
@ -214,23 +231,29 @@ impl<Node> ExExNotifications<Node> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<ExExNotification>> {
self.notifications.poll_recv(cx)
}
}
// TODO(alexey): make it public when backfill is implemented in [`ExExNotificationsWithHead`]
impl<P, E> ExExNotifications<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
E: BlockExecutorProvider + Clone + Unpin + 'static,
{
/// Subscribe to notifications with the given head.
///
/// Notifications will be sent starting from the head, not inclusive. For example, if
/// `head.number == 10`, then the first notification will be with `block.number == 11`.
#[allow(dead_code)]
fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<Node> {
ExExNotificationsWithHead {
components: self.components,
notifications: self.notifications,
pub fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
ExExNotificationsWithHead::new(
self.node_head,
self.provider,
self.executor,
self.notifications,
head,
}
)
}
}
impl<Node: Unpin> Stream for ExExNotifications<Node> {
impl<P: Unpin, E: Unpin> Stream for ExExNotifications<P, E> {
type Item = ExExNotification;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -241,31 +264,212 @@ impl<Node: Unpin> Stream for ExExNotifications<Node> {
/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are
/// committed or reverted after the given head.
#[derive(Debug)]
pub struct ExExNotificationsWithHead<Node> {
#[allow(dead_code)]
components: Node,
pub struct ExExNotificationsWithHead<P, E> {
node_head: Head,
provider: P,
executor: E,
notifications: Receiver<ExExNotification>,
head: ExExHead,
exex_head: ExExHead,
pending_sync: bool,
/// The backfill job to run before consuming any notifications.
backfill_job: Option<StreamBackfillJob<E, P, Chain>>,
/// Whether we're currently waiting for the node head to catch up to the same height as the
/// ExEx head.
node_head_catchup_in_progress: bool,
}
impl<Node: Unpin> Stream for ExExNotificationsWithHead<Node> {
type Item = ExExNotification;
impl<P, E> ExExNotificationsWithHead<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
E: BlockExecutorProvider + Clone + Unpin + 'static,
{
/// Creates a new [`ExExNotificationsWithHead`].
pub const fn new(
node_head: Head,
provider: P,
executor: E,
notifications: Receiver<ExExNotification>,
exex_head: ExExHead,
) -> Self {
Self {
node_head,
provider,
executor,
notifications,
exex_head,
pending_sync: true,
backfill_job: None,
node_head_catchup_in_progress: false,
}
}
/// Compares the node head against the ExEx head, and synchronizes them in case of a mismatch.
///
/// Possible situations are:
/// - ExEx is behind the node head (`node_head.number < exex_head.number`).
/// - ExEx is on the canonical chain (`exex_head.hash` is found in the node database).
/// Backfill from the node database.
/// - ExEx is not on the canonical chain (`exex_head.hash` is not found in the node database).
/// Unwind the ExEx to the first block matching between the ExEx and the node, and then
/// bacfkill from the node database.
/// - ExEx is at the same block number (`node_head.number == exex_head.number`).
/// - ExEx is on the canonical chain (`exex_head.hash` is found in the node database). Nothing
/// to do.
/// - ExEx is not on the canonical chain (`exex_head.hash` is not found in the node database).
/// Unwind the ExEx to the first block matching between the ExEx and the node, and then
/// backfill from the node database.
/// - ExEx is ahead of the node head (`node_head.number > exex_head.number`). Wait until the
/// node head catches up to the ExEx head, and then repeat the synchronization process.
fn synchronize(&mut self) -> eyre::Result<()> {
debug!(target: "exex::manager", "Synchronizing ExEx head");
let backfill_job_factory =
BackfillJobFactory::new(self.executor.clone(), self.provider.clone());
match self.exex_head.block.number.cmp(&self.node_head.number) {
std::cmp::Ordering::Less => {
// ExEx is behind the node head
if let Some(exex_header) = self.provider.header(&self.exex_head.block.hash)? {
// ExEx is on the canonical chain
debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain");
if exex_header.number != self.exex_head.block.number {
eyre::bail!("ExEx head number does not match the hash")
}
// ExEx is on the canonical chain, start backfill
let backfill = backfill_job_factory
.backfill(self.exex_head.block.number + 1..=self.node_head.number)
.into_stream();
self.backfill_job = Some(backfill);
} else {
debug!(target: "exex::manager", "ExEx is behind the node head and not on the canonical chain");
// ExEx is not on the canonical chain, first unwind it and then backfill
// TODO(alexey): unwind and backfill
self.backfill_job = None;
}
}
#[allow(clippy::branches_sharing_code)]
std::cmp::Ordering::Equal => {
// ExEx is at the same block height as the node head
if let Some(exex_header) = self.provider.header(&self.exex_head.block.hash)? {
// ExEx is on the canonical chain
debug!(target: "exex::manager", "ExEx is at the same block height as the node head and on the canonical chain");
if exex_header.number != self.exex_head.block.number {
eyre::bail!("ExEx head number does not match the hash")
}
// ExEx is on the canonical chain and the same as the node head, no need to
// backfill
self.backfill_job = None;
} else {
// ExEx is not on the canonical chain, first unwind it and then backfill
debug!(target: "exex::manager", "ExEx is at the same block height as the node head but not on the canonical chain");
// TODO(alexey): unwind and backfill
self.backfill_job = None;
}
}
std::cmp::Ordering::Greater => {
debug!(target: "exex::manager", "ExEx is ahead of the node head");
// ExEx is ahead of the node head
// TODO(alexey): wait until the node head is at the same height as the ExEx head
// and then repeat the process above
self.node_head_catchup_in_progress = true;
}
};
Ok(())
}
}
impl<P, E> Stream for ExExNotificationsWithHead<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
E: BlockExecutorProvider + Clone + Unpin + 'static,
{
type Item = eyre::Result<ExExNotification>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// TODO(alexey): backfill according to the head
if this.pending_sync {
this.synchronize()?;
this.pending_sync = false;
}
if let Some(backfill_job) = &mut this.backfill_job {
if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)) {
return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
new: Arc::new(chain?),
})))
}
// Backfill job is done, remove it
this.backfill_job = None;
}
loop {
let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
return Poll::Ready(None)
};
// 1. Either committed or reverted chain from the notification.
// 2. Block number of the tip of the canonical chain:
// - For committed chain, it's the tip block number.
// - For reverted chain, it's the block number preceding the first block in the chain.
let (chain, tip) = notification
.committed_chain()
.map(|chain| (chain.clone(), chain.tip().number))
.or_else(|| {
notification
.reverted_chain()
.map(|chain| (chain.clone(), chain.first().number - 1))
})
.unzip();
if this.node_head_catchup_in_progress {
// If we are waiting for the node head to catch up to the same height as the ExEx
// head, then we need to check if the ExEx is on the canonical chain.
// Query the chain from the new notification for the ExEx head block number.
let exex_head_block = chain
.as_ref()
.and_then(|chain| chain.blocks().get(&this.exex_head.block.number));
// Compare the hash of the block from the new notification to the ExEx head
// hash.
if let Some((block, tip)) = exex_head_block.zip(tip) {
if block.hash() == this.exex_head.block.hash {
// ExEx is on the canonical chain, proceed with the notification
this.node_head_catchup_in_progress = false;
} else {
// ExEx is not on the canonical chain, synchronize
let tip =
this.provider.sealed_header(tip)?.ok_or_eyre("node head not found")?;
this.node_head = Head::new(
tip.number,
tip.hash(),
tip.difficulty,
U256::MAX,
tip.timestamp,
);
this.synchronize()?;
}
}
}
if notification
.committed_chain()
.or_else(|| notification.reverted_chain())
.map_or(false, |chain| chain.first().number > this.head.block.number)
.map_or(false, |chain| chain.first().number > this.exex_head.block.number)
{
return Poll::Ready(Some(notification))
return Poll::Ready(Some(Ok(notification)))
}
}
}
@ -606,13 +810,19 @@ impl Clone for ExExManagerHandle {
mod tests {
use super::*;
use futures::StreamExt;
use reth_primitives::{SealedBlockWithSenders, B256};
use reth_provider::Chain;
use reth_db_common::init::init_genesis;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_primitives::{Block, BlockNumHash, Header, SealedBlockWithSenders, B256};
use reth_provider::{
providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader,
BlockWriter, Chain,
};
use reth_testing_utils::generators::{self, random_block, BlockParams};
#[tokio::test]
async fn test_delivers_events() {
let (mut exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string(), ());
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
// Send an event and check that it's delivered correctly
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();
@ -622,7 +832,8 @@ mod tests {
#[tokio::test]
async fn test_has_exexs() {
let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ());
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ());
assert!(!ExExManager::new(vec![], 0).handle.has_exexs());
@ -631,7 +842,8 @@ mod tests {
#[tokio::test]
async fn test_has_capacity() {
let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ());
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ());
assert!(!ExExManager::new(vec![], 0).handle.has_capacity());
@ -640,7 +852,7 @@ mod tests {
#[test]
fn test_push_notification() {
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), ());
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
// Create a mock ExExManager and add the exex_handle to it
let mut exex_manager = ExExManager::new(vec![exex_handle], 10);
@ -685,7 +897,7 @@ mod tests {
#[test]
fn test_update_capacity() {
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), ());
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
// Create a mock ExExManager and add the exex_handle to it
let max_capacity = 5;
@ -720,7 +932,7 @@ mod tests {
#[tokio::test]
async fn test_updates_block_height() {
let (exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string(), ());
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
// Check initial block height
assert!(exex_handle.finished_height.is_none());
@ -757,8 +969,10 @@ mod tests {
#[tokio::test]
async fn test_updates_block_height_lower() {
// Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), ());
let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string(), ());
let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), ());
let (exex_handle2, event_tx2, _) =
ExExHandle::new("test_exex2".to_string(), Head::default(), (), ());
// Send events to update the block heights of the two handles, with the second being lower
event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap();
@ -788,8 +1002,10 @@ mod tests {
#[tokio::test]
async fn test_updates_block_height_greater() {
// Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), ());
let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string(), ());
let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), ());
let (exex_handle2, event_tx2, _) =
ExExHandle::new("test_exex2".to_string(), Head::default(), (), ());
// Assert that the initial block height is `None` for the first `ExExHandle`.
assert!(exex_handle1.finished_height.is_none());
@ -825,7 +1041,8 @@ mod tests {
#[tokio::test]
async fn test_exex_manager_capacity() {
let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ());
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ());
// Create an ExExManager with a small max capacity
let max_capacity = 2;
@ -863,7 +1080,8 @@ mod tests {
#[tokio::test]
async fn exex_handle_new() {
let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ());
let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
// Check initial state
assert_eq!(exex_handle.id, "test_exex");
@ -905,7 +1123,8 @@ mod tests {
#[tokio::test]
async fn test_notification_if_finished_height_gt_chain_tip() {
let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ());
let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
// Set finished_height to a value higher than the block tip
exex_handle.finished_height = Some(15);
@ -946,7 +1165,8 @@ mod tests {
#[tokio::test]
async fn test_sends_chain_reorged_notification() {
let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ());
let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
let notification = ExExNotification::ChainReorged {
old: Arc::new(Chain::default()),
@ -976,7 +1196,8 @@ mod tests {
#[tokio::test]
async fn test_sends_chain_reverted_notification() {
let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ());
let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
@ -1000,4 +1221,220 @@ mod tests {
// Ensure the notification ID was incremented
assert_eq!(exex_handle.next_notification_id, 23);
}
#[tokio::test]
async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
let mut rng = generators::rng();
let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?;
let genesis_block = provider_factory
.block(genesis_hash.into())?
.ok_or_else(|| eyre::eyre!("genesis block not found"))?;
let provider = BlockchainProvider2::new(provider_factory.clone())?;
let node_head_block = random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
);
let provider_rw = provider_factory.provider_rw()?;
provider_rw.insert_block(
node_head_block.clone().seal_with_senders().ok_or_eyre("failed to recover senders")?,
)?;
provider_rw.commit()?;
let node_head = Head {
number: node_head_block.number,
hash: node_head_block.hash(),
..Default::default()
};
let exex_head =
ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
node_head.number + 1,
BlockParams { parent: Some(node_head.hash), ..Default::default() },
)
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?],
Default::default(),
None,
)),
};
let (notifications_tx, notifications_rx) = mpsc::channel(1);
notifications_tx.send(notification.clone()).await?;
let mut notifications = ExExNotifications::new(
node_head,
provider,
EthExecutorProvider::mainnet(),
notifications_rx,
)
.with_head(exex_head);
// First notification is the backfill of missing blocks from the canonical chain
assert_eq!(
notifications.next().await.transpose()?,
Some(ExExNotification::ChainCommitted {
new: Arc::new(
BackfillJobFactory::new(
notifications.executor.clone(),
notifications.provider.clone()
)
.backfill(1..=1)
.next()
.ok_or_eyre("failed to backfill")??
)
})
);
// Second notification is the actual notification that we sent before
assert_eq!(notifications.next().await.transpose()?, Some(notification));
Ok(())
}
#[ignore]
#[tokio::test]
async fn exex_notifications_behind_head_non_canonical() -> eyre::Result<()> {
Ok(())
}
#[tokio::test]
async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?;
let genesis_block = provider_factory
.block(genesis_hash.into())?
.ok_or_else(|| eyre::eyre!("genesis block not found"))?;
let provider = BlockchainProvider2::new(provider_factory)?;
let node_head =
Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() };
let exex_head =
ExExHead { block: BlockNumHash { number: node_head.number, hash: node_head.hash } };
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![Block {
header: Header {
parent_hash: node_head.hash,
number: node_head.number + 1,
..Default::default()
},
..Default::default()
}
.seal_slow()
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?],
Default::default(),
None,
)),
};
let (notifications_tx, notifications_rx) = mpsc::channel(1);
notifications_tx.send(notification.clone()).await?;
let mut notifications = ExExNotifications::new(
node_head,
provider,
EthExecutorProvider::mainnet(),
notifications_rx,
)
.with_head(exex_head);
let new_notification = notifications.next().await.transpose()?;
assert_eq!(new_notification, Some(notification));
Ok(())
}
#[ignore]
#[tokio::test]
async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
Ok(())
}
#[tokio::test]
async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
let mut rng = generators::rng();
let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?;
let genesis_block = provider_factory
.block(genesis_hash.into())?
.ok_or_else(|| eyre::eyre!("genesis block not found"))?;
let provider = BlockchainProvider2::new(provider_factory)?;
let exex_head_block = random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
);
let node_head =
Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() };
let exex_head = ExExHead {
block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
};
let (notifications_tx, notifications_rx) = mpsc::channel(1);
notifications_tx
.send(ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![exex_head_block
.clone()
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?],
Default::default(),
None,
)),
})
.await?;
let mut notifications = ExExNotifications::new(
node_head,
provider,
EthExecutorProvider::mainnet(),
notifications_rx,
)
.with_head(exex_head);
// First notification is skipped because the node is catching up with the ExEx
let new_notification = poll_fn(|cx| Poll::Ready(notifications.poll_next_unpin(cx))).await;
assert!(new_notification.is_pending());
// Imitate the node catching up with the ExEx by sending a notification for the missing
// block
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
exex_head_block.number + 1,
BlockParams { parent: Some(exex_head_block.hash()), ..Default::default() },
)
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?],
Default::default(),
None,
)),
};
notifications_tx.send(notification.clone()).await?;
// Second notification is received because the node caught up with the ExEx
assert_eq!(notifications.next().await.transpose()?, Some(notification));
Ok(())
}
}

View File

@ -296,7 +296,12 @@ pub async fn test_exex_context_with_chain_spec(
let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel();
let (notifications_tx, notifications_rx) = tokio::sync::mpsc::channel(1);
let notifications = ExExNotifications::new(components.clone(), notifications_rx);
let notifications = ExExNotifications::new(
head,
components.provider.clone(),
components.components.executor.clone(),
notifications_rx,
);
let ctx = ExExContext {
head,

View File

@ -1,8 +1,7 @@
use reth_primitives::BlockNumHash;
#[allow(clippy::doc_markdown)]
/// A head of the ExEx. It determines the highest block committed to the internal ExEx state.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ExExHead {
/// The head block.
pub block: BlockNumHash,