mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: ExExEvent::FinishedHeight takes BlockNumHash instead (#11278)
Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
@ -125,7 +125,7 @@ async fn my_exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::
|
|||||||
|
|
||||||
if let Some(committed_chain) = notification.committed_chain() {
|
if let Some(committed_chain) = notification.committed_chain() {
|
||||||
ctx.events
|
ctx.events
|
||||||
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
|
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -23,4 +23,4 @@ event to signify what blocks have been processed. This event is used by Reth to
|
|||||||
|
|
||||||
An ExEx will only receive notifications for block numbers greater than the block in the most recently emitted `FinishedHeight` event.
|
An ExEx will only receive notifications for block numbers greater than the block in the most recently emitted `FinishedHeight` event.
|
||||||
|
|
||||||
To clarify: if an ExEx emits `ExExEvent::FinishedHeight(0)` it will receive notifications for any `block_number > 0`.
|
To clarify: if an ExEx emits `ExExEvent::FinishedHeight` for `block #0` it will receive notifications for any `block_number > 0`.
|
||||||
|
|||||||
@ -279,7 +279,7 @@ async fn remote_exex<Node: FullNodeComponents>(
|
|||||||
while let Some(notification) = ctx.notifications.next().await {
|
while let Some(notification) = ctx.notifications.next().await {
|
||||||
if let Some(committed_chain) = notification.committed_chain() {
|
if let Some(committed_chain) = notification.committed_chain() {
|
||||||
ctx.events
|
ctx.events
|
||||||
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
|
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Notification sent to the gRPC server");
|
info!("Notification sent to the gRPC server");
|
||||||
@ -388,7 +388,7 @@ async fn remote_exex<Node: FullNodeComponents>(
|
|||||||
while let Some(notification) = ctx.notifications.next().await {
|
while let Some(notification) = ctx.notifications.next().await {
|
||||||
if let Some(committed_chain) = notification.committed_chain() {
|
if let Some(committed_chain) = notification.committed_chain() {
|
||||||
ctx.events
|
ctx.events
|
||||||
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
|
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(?notification, "Notification sent to the gRPC server");
|
info!(?notification, "Notification sent to the gRPC server");
|
||||||
|
|||||||
@ -57,7 +57,7 @@ impl<Node: FullNodeComponents> Future for MyExEx<Node> {
|
|||||||
if let Some(committed_chain) = notification.committed_chain() {
|
if let Some(committed_chain) = notification.committed_chain() {
|
||||||
this.ctx
|
this.ctx
|
||||||
.events
|
.events
|
||||||
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
|
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -152,7 +152,7 @@ impl<Node: FullNodeComponents> Future for MyExEx<Node> {
|
|||||||
|
|
||||||
this.ctx
|
this.ctx
|
||||||
.events
|
.events
|
||||||
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
|
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(first_block) = this.first_block {
|
if let Some(first_block) = this.first_block {
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
use alloy_primitives::BlockNumber;
|
use reth_primitives::BlockNumHash;
|
||||||
|
|
||||||
/// Events emitted by an `ExEx`.
|
/// Events emitted by an `ExEx`.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
@ -9,5 +9,5 @@ pub enum ExExEvent {
|
|||||||
/// meaning that Reth is allowed to prune them.
|
/// meaning that Reth is allowed to prune them.
|
||||||
///
|
///
|
||||||
/// On reorgs, it's possible for the height to go down.
|
/// On reorgs, it's possible for the height to go down.
|
||||||
FinishedHeight(BlockNumber),
|
FinishedHeight(BlockNumHash),
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,13 +1,12 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
|
wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
|
||||||
};
|
};
|
||||||
use alloy_primitives::BlockNumber;
|
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use metrics::Gauge;
|
use metrics::Gauge;
|
||||||
use reth_chain_state::ForkChoiceStream;
|
use reth_chain_state::ForkChoiceStream;
|
||||||
use reth_chainspec::Head;
|
use reth_chainspec::Head;
|
||||||
use reth_metrics::{metrics::Counter, Metrics};
|
use reth_metrics::{metrics::Counter, Metrics};
|
||||||
use reth_primitives::SealedHeader;
|
use reth_primitives::{BlockNumHash, SealedHeader};
|
||||||
use reth_tracing::tracing::debug;
|
use reth_tracing::tracing::debug;
|
||||||
use std::{
|
use std::{
|
||||||
collections::VecDeque,
|
collections::VecDeque,
|
||||||
@ -53,10 +52,10 @@ pub struct ExExHandle {
|
|||||||
receiver: UnboundedReceiver<ExExEvent>,
|
receiver: UnboundedReceiver<ExExEvent>,
|
||||||
/// The ID of the next notification to send to this `ExEx`.
|
/// The ID of the next notification to send to this `ExEx`.
|
||||||
next_notification_id: usize,
|
next_notification_id: usize,
|
||||||
/// The finished block number of the `ExEx`.
|
/// The finished block of the `ExEx`.
|
||||||
///
|
///
|
||||||
/// If this is `None`, the `ExEx` has not emitted a `FinishedHeight` event.
|
/// If this is `None`, the `ExEx` has not emitted a `FinishedHeight` event.
|
||||||
finished_height: Option<BlockNumber>,
|
finished_height: Option<BlockNumHash>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExExHandle {
|
impl ExExHandle {
|
||||||
@ -105,11 +104,11 @@ impl ExExHandle {
|
|||||||
// Skip the chain commit notification if the finished height of the ExEx is
|
// Skip the chain commit notification if the finished height of the ExEx is
|
||||||
// higher than or equal to the tip of the new notification.
|
// higher than or equal to the tip of the new notification.
|
||||||
// I.e., the ExEx has already processed the notification.
|
// I.e., the ExEx has already processed the notification.
|
||||||
if finished_height >= new.tip().number {
|
if finished_height.number >= new.tip().number {
|
||||||
debug!(
|
debug!(
|
||||||
exex_id = %self.id,
|
exex_id = %self.id,
|
||||||
%notification_id,
|
%notification_id,
|
||||||
%finished_height,
|
?finished_height,
|
||||||
new_tip = %new.tip().number,
|
new_tip = %new.tip().number,
|
||||||
"Skipping notification"
|
"Skipping notification"
|
||||||
);
|
);
|
||||||
@ -377,7 +376,7 @@ impl Future for ExExManager {
|
|||||||
|
|
||||||
// update watch channel block number
|
// update watch channel block number
|
||||||
let finished_height = self.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
|
let finished_height = self.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
|
||||||
exex.finished_height.map_or(Err(()), |height| Ok(height.min(curr)))
|
exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
|
||||||
});
|
});
|
||||||
if let Ok(finished_height) = finished_height {
|
if let Ok(finished_height) = finished_height {
|
||||||
let _ = self.finished_height.send(FinishedExExHeight::Height(finished_height));
|
let _ = self.finished_height.send(FinishedExExHeight::Height(finished_height));
|
||||||
@ -532,9 +531,10 @@ mod tests {
|
|||||||
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
|
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
|
||||||
|
|
||||||
// Send an event and check that it's delivered correctly
|
// Send an event and check that it's delivered correctly
|
||||||
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();
|
let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random()));
|
||||||
|
event_tx.send(event).unwrap();
|
||||||
let received_event = exex_handle.receiver.recv().await.unwrap();
|
let received_event = exex_handle.receiver.recv().await.unwrap();
|
||||||
assert_eq!(received_event, ExExEvent::FinishedHeight(42));
|
assert_eq!(received_event, event);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@ -672,7 +672,8 @@ mod tests {
|
|||||||
assert!(exex_handle.finished_height.is_none());
|
assert!(exex_handle.finished_height.is_none());
|
||||||
|
|
||||||
// Update the block height via an event
|
// Update the block height via an event
|
||||||
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();
|
let block = BlockNumHash::new(42, B256::random());
|
||||||
|
event_tx.send(ExExEvent::FinishedHeight(block)).unwrap();
|
||||||
|
|
||||||
// Create a mock ExExManager and add the exex_handle to it
|
// Create a mock ExExManager and add the exex_handle to it
|
||||||
let exex_manager = ExExManager::new(
|
let exex_manager = ExExManager::new(
|
||||||
@ -690,7 +691,7 @@ mod tests {
|
|||||||
|
|
||||||
// Check that the block height was updated
|
// Check that the block height was updated
|
||||||
let updated_exex_handle = &pinned_manager.exex_handles[0];
|
let updated_exex_handle = &pinned_manager.exex_handles[0];
|
||||||
assert_eq!(updated_exex_handle.finished_height, Some(42));
|
assert_eq!(updated_exex_handle.finished_height, Some(block));
|
||||||
|
|
||||||
// Get the receiver for the finished height
|
// Get the receiver for the finished height
|
||||||
let mut receiver = pinned_manager.handle.finished_height();
|
let mut receiver = pinned_manager.handle.finished_height();
|
||||||
@ -716,9 +717,12 @@ mod tests {
|
|||||||
let (exex_handle2, event_tx2, _) =
|
let (exex_handle2, event_tx2, _) =
|
||||||
ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle());
|
ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle());
|
||||||
|
|
||||||
|
let block1 = BlockNumHash::new(42, B256::random());
|
||||||
|
let block2 = BlockNumHash::new(10, B256::random());
|
||||||
|
|
||||||
// Send events to update the block heights of the two handles, with the second being lower
|
// Send events to update the block heights of the two handles, with the second being lower
|
||||||
event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap();
|
event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
|
||||||
event_tx2.send(ExExEvent::FinishedHeight(10)).unwrap();
|
event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
|
||||||
|
|
||||||
let exex_manager = ExExManager::new(
|
let exex_manager = ExExManager::new(
|
||||||
vec![exex_handle1, exex_handle2],
|
vec![exex_handle1, exex_handle2],
|
||||||
@ -760,9 +764,12 @@ mod tests {
|
|||||||
// Assert that the initial block height is `None` for the first `ExExHandle`.
|
// Assert that the initial block height is `None` for the first `ExExHandle`.
|
||||||
assert!(exex_handle1.finished_height.is_none());
|
assert!(exex_handle1.finished_height.is_none());
|
||||||
|
|
||||||
|
let block1 = BlockNumHash::new(42, B256::random());
|
||||||
|
let block2 = BlockNumHash::new(100, B256::random());
|
||||||
|
|
||||||
// Send events to update the block heights of the two handles, with the second being higher.
|
// Send events to update the block heights of the two handles, with the second being higher.
|
||||||
event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap();
|
event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
|
||||||
event_tx2.send(ExExEvent::FinishedHeight(100)).unwrap();
|
event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
|
||||||
|
|
||||||
let exex_manager = ExExManager::new(
|
let exex_manager = ExExManager::new(
|
||||||
vec![exex_handle1, exex_handle2],
|
vec![exex_handle1, exex_handle2],
|
||||||
@ -896,7 +903,7 @@ mod tests {
|
|||||||
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
|
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
|
||||||
|
|
||||||
// Set finished_height to a value higher than the block tip
|
// Set finished_height to a value higher than the block tip
|
||||||
exex_handle.finished_height = Some(15);
|
exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));
|
||||||
|
|
||||||
let mut block1 = SealedBlockWithSenders::default();
|
let mut block1 = SealedBlockWithSenders::default();
|
||||||
block1.block.header.set_hash(B256::new([0x01; 32]));
|
block1.block.header.set_hash(B256::new([0x01; 32]));
|
||||||
@ -947,7 +954,7 @@ mod tests {
|
|||||||
|
|
||||||
// Even if the finished height is higher than the tip of the new chain, the reorg
|
// Even if the finished height is higher than the tip of the new chain, the reorg
|
||||||
// notification should be received
|
// notification should be received
|
||||||
exex_handle.finished_height = Some(u64::MAX);
|
exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
|
||||||
|
|
||||||
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
|
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
|
||||||
|
|
||||||
@ -978,7 +985,7 @@ mod tests {
|
|||||||
|
|
||||||
// Even if the finished height is higher than the tip of the new chain, the reorg
|
// Even if the finished height is higher than the tip of the new chain, the reorg
|
||||||
// notification should be received
|
// notification should be received
|
||||||
exex_handle.finished_height = Some(u64::MAX);
|
exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
|
||||||
|
|
||||||
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
|
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
|
||||||
|
|
||||||
|
|||||||
@ -41,7 +41,7 @@ use reth_node_ethereum::{
|
|||||||
EthEngineTypes, EthEvmConfig,
|
EthEngineTypes, EthEvmConfig,
|
||||||
};
|
};
|
||||||
use reth_payload_builder::noop::NoopPayloadBuilderService;
|
use reth_payload_builder::noop::NoopPayloadBuilderService;
|
||||||
use reth_primitives::{Head, SealedBlockWithSenders};
|
use reth_primitives::{BlockNumHash, Head, SealedBlockWithSenders};
|
||||||
use reth_provider::{
|
use reth_provider::{
|
||||||
providers::{BlockchainProvider, StaticFileProvider},
|
providers::{BlockchainProvider, StaticFileProvider},
|
||||||
BlockReader, ProviderFactory,
|
BlockReader, ProviderFactory,
|
||||||
@ -223,7 +223,7 @@ impl TestExExHandle {
|
|||||||
/// Asserts that the Execution Extension emitted a `FinishedHeight` event with the correct
|
/// Asserts that the Execution Extension emitted a `FinishedHeight` event with the correct
|
||||||
/// height.
|
/// height.
|
||||||
#[track_caller]
|
#[track_caller]
|
||||||
pub fn assert_event_finished_height(&mut self, height: u64) -> eyre::Result<()> {
|
pub fn assert_event_finished_height(&mut self, height: BlockNumHash) -> eyre::Result<()> {
|
||||||
let event = self.events_rx.try_recv()?;
|
let event = self.events_rx.try_recv()?;
|
||||||
assert_eq!(event, ExExEvent::FinishedHeight(height));
|
assert_eq!(event, ExExEvent::FinishedHeight(height));
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
Reference in New Issue
Block a user