mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
chore: move chain state notifications to reth-chain-state crate (#9730)
This commit is contained in:
@ -22,9 +22,16 @@ revm = { workspace = true, optional = true}
|
||||
|
||||
# async
|
||||
tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] }
|
||||
tokio-stream = { workspace = true, features = ["sync"] }
|
||||
|
||||
# tracing
|
||||
tracing.workspace = true
|
||||
|
||||
# misc
|
||||
auto_impl.workspace = true
|
||||
derive_more.workspace = true
|
||||
parking_lot.workspace = true
|
||||
pin-project.workspace = true
|
||||
rand = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@ -14,6 +14,13 @@ pub use in_memory::*;
|
||||
mod chain_info;
|
||||
pub use chain_info::ChainInfoTracker;
|
||||
|
||||
mod notifications;
|
||||
pub use notifications::{
|
||||
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
|
||||
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream,
|
||||
ForkChoiceSubscriptions,
|
||||
};
|
||||
|
||||
#[cfg(any(test, feature = "test-utils"))]
|
||||
/// Common test helpers
|
||||
pub mod test_utils;
|
||||
|
||||
183
crates/chain-state/src/notifications.rs
Normal file
183
crates/chain-state/src/notifications.rs
Normal file
@ -0,0 +1,183 @@
|
||||
//! Canonical chain state notification trait and types.
|
||||
|
||||
use auto_impl::auto_impl;
|
||||
use derive_more::{Deref, DerefMut};
|
||||
use reth_execution_types::{BlockReceipts, Chain};
|
||||
use reth_primitives::{SealedBlockWithSenders, SealedHeader};
|
||||
use std::{
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio_stream::{wrappers::BroadcastStream, Stream};
|
||||
use tracing::debug;
|
||||
|
||||
/// Type alias for a receiver that receives [`CanonStateNotification`]
|
||||
pub type CanonStateNotifications = broadcast::Receiver<CanonStateNotification>;
|
||||
|
||||
/// Type alias for a sender that sends [`CanonStateNotification`]
|
||||
pub type CanonStateNotificationSender = broadcast::Sender<CanonStateNotification>;
|
||||
|
||||
/// A type that allows to register chain related event subscriptions.
|
||||
#[auto_impl(&, Arc)]
|
||||
pub trait CanonStateSubscriptions: Send + Sync {
|
||||
/// Get notified when a new canonical chain was imported.
|
||||
///
|
||||
/// A canonical chain be one or more blocks, a reorg or a revert.
|
||||
fn subscribe_to_canonical_state(&self) -> CanonStateNotifications;
|
||||
|
||||
/// Convenience method to get a stream of [`CanonStateNotification`].
|
||||
fn canonical_state_stream(&self) -> CanonStateNotificationStream {
|
||||
CanonStateNotificationStream {
|
||||
st: BroadcastStream::new(self.subscribe_to_canonical_state()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A Stream of [`CanonStateNotification`].
|
||||
#[derive(Debug)]
|
||||
#[pin_project::pin_project]
|
||||
pub struct CanonStateNotificationStream {
|
||||
#[pin]
|
||||
st: BroadcastStream<CanonStateNotification>,
|
||||
}
|
||||
|
||||
impl Stream for CanonStateNotificationStream {
|
||||
type Item = CanonStateNotification;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
return match ready!(self.as_mut().project().st.poll_next(cx)) {
|
||||
Some(Ok(notification)) => Poll::Ready(Some(notification)),
|
||||
Some(Err(err)) => {
|
||||
debug!(%err, "canonical state notification stream lagging behind");
|
||||
continue
|
||||
}
|
||||
None => Poll::Ready(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Chain action that is triggered when a new block is imported or old block is reverted.
|
||||
/// and will return all `ExecutionOutcome` and
|
||||
/// [`reth_primitives::SealedBlockWithSenders`] of both reverted and committed blocks.
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum CanonStateNotification {
|
||||
/// Chain got extended without reorg and only new chain is returned.
|
||||
Commit {
|
||||
/// The newly extended chain.
|
||||
new: Arc<Chain>,
|
||||
},
|
||||
/// Chain reorgs and both old and new chain are returned.
|
||||
/// Revert is just a subset of reorg where the new chain is empty.
|
||||
Reorg {
|
||||
/// The old chain before reorganization.
|
||||
old: Arc<Chain>,
|
||||
/// The new chain after reorganization.
|
||||
new: Arc<Chain>,
|
||||
},
|
||||
}
|
||||
|
||||
// For one reason or another, the compiler can't derive PartialEq for CanonStateNotification.
|
||||
// so we are forced to implement it manually.
|
||||
impl PartialEq for CanonStateNotification {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(Self::Reorg { old: old1, new: new1 }, Self::Reorg { old: old2, new: new2 }) => {
|
||||
old1 == old2 && new1 == new2
|
||||
}
|
||||
(Self::Commit { new: new1 }, Self::Commit { new: new2 }) => new1 == new2,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CanonStateNotification {
|
||||
/// Get old chain if any.
|
||||
pub fn reverted(&self) -> Option<Arc<Chain>> {
|
||||
match self {
|
||||
Self::Commit { .. } => None,
|
||||
Self::Reorg { old, .. } => Some(old.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the new chain if any.
|
||||
///
|
||||
/// Returns the new committed [Chain] for [`Self::Reorg`] and [`Self::Commit`] variants.
|
||||
pub fn committed(&self) -> Arc<Chain> {
|
||||
match self {
|
||||
Self::Commit { new } | Self::Reorg { new, .. } => new.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the new tip of the chain.
|
||||
///
|
||||
/// Returns the new tip for [`Self::Reorg`] and [`Self::Commit`] variants which commit at least
|
||||
/// 1 new block.
|
||||
pub fn tip(&self) -> &SealedBlockWithSenders {
|
||||
match self {
|
||||
Self::Commit { new } | Self::Reorg { new, .. } => new.tip(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return receipt with its block number and transaction hash.
|
||||
///
|
||||
/// Last boolean is true if receipt is from reverted block.
|
||||
pub fn block_receipts(&self) -> Vec<(BlockReceipts, bool)> {
|
||||
let mut receipts = Vec::new();
|
||||
|
||||
// get old receipts
|
||||
if let Some(old) = self.reverted() {
|
||||
receipts
|
||||
.extend(old.receipts_with_attachment().into_iter().map(|receipt| (receipt, true)));
|
||||
}
|
||||
// get new receipts
|
||||
receipts.extend(
|
||||
self.committed().receipts_with_attachment().into_iter().map(|receipt| (receipt, false)),
|
||||
);
|
||||
receipts
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper around a broadcast receiver that receives fork choice notifications.
|
||||
#[derive(Debug, Deref, DerefMut)]
|
||||
pub struct ForkChoiceNotifications(broadcast::Receiver<SealedHeader>);
|
||||
|
||||
/// A trait that allows to register to fork choice related events
|
||||
/// and get notified when a new fork choice is available.
|
||||
pub trait ForkChoiceSubscriptions: Send + Sync {
|
||||
/// Get notified when a new head of the chain is selected.
|
||||
fn subscribe_to_fork_choice(&self) -> ForkChoiceNotifications;
|
||||
|
||||
/// Convenience method to get a stream of the new head of the chain.
|
||||
fn fork_choice_stream(&self) -> ForkChoiceStream {
|
||||
ForkChoiceStream { st: BroadcastStream::new(self.subscribe_to_fork_choice().0) }
|
||||
}
|
||||
}
|
||||
|
||||
/// A stream of the fork choices in the form of [`SealedHeader`].
|
||||
#[derive(Debug)]
|
||||
#[pin_project::pin_project]
|
||||
pub struct ForkChoiceStream {
|
||||
#[pin]
|
||||
st: BroadcastStream<SealedHeader>,
|
||||
}
|
||||
|
||||
impl Stream for ForkChoiceStream {
|
||||
type Item = SealedHeader;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
return match ready!(self.as_mut().project().st.poll_next(cx)) {
|
||||
Some(Ok(notification)) => Poll::Ready(Some(notification)),
|
||||
Some(Err(err)) => {
|
||||
debug!(%err, "finalized header notification stream lagging behind");
|
||||
continue
|
||||
}
|
||||
None => Poll::Ready(None),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,12 +1,19 @@
|
||||
use crate::in_memory::ExecutedBlock;
|
||||
use crate::{
|
||||
in_memory::ExecutedBlock, CanonStateNotification, CanonStateNotifications,
|
||||
CanonStateSubscriptions,
|
||||
};
|
||||
use rand::Rng;
|
||||
use reth_execution_types::ExecutionOutcome;
|
||||
use reth_execution_types::{Chain, ExecutionOutcome};
|
||||
use reth_primitives::{
|
||||
Address, Block, BlockNumber, Receipts, Requests, SealedBlockWithSenders, TransactionSigned,
|
||||
};
|
||||
use reth_trie::{updates::TrieUpdates, HashedPostState};
|
||||
use revm::db::BundleState;
|
||||
use std::{ops::Range, sync::Arc};
|
||||
use std::{
|
||||
ops::Range,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use tokio::sync::broadcast::{self, Sender};
|
||||
|
||||
fn get_executed_block(block_number: BlockNumber, receipts: Receipts) -> ExecutedBlock {
|
||||
let mut block = Block::default();
|
||||
@ -50,3 +57,34 @@ pub fn get_executed_block_with_number(block_number: BlockNumber) -> ExecutedBloc
|
||||
pub fn get_executed_blocks(range: Range<u64>) -> impl Iterator<Item = ExecutedBlock> {
|
||||
range.map(get_executed_block_with_number)
|
||||
}
|
||||
|
||||
/// A test `ChainEventSubscriptions`
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct TestCanonStateSubscriptions {
|
||||
canon_notif_tx: Arc<Mutex<Vec<Sender<CanonStateNotification>>>>,
|
||||
}
|
||||
|
||||
impl TestCanonStateSubscriptions {
|
||||
/// Adds new block commit to the queue that can be consumed with
|
||||
/// [`TestCanonStateSubscriptions::subscribe_to_canonical_state`]
|
||||
pub fn add_next_commit(&self, new: Arc<Chain>) {
|
||||
let event = CanonStateNotification::Commit { new };
|
||||
self.canon_notif_tx.lock().as_mut().unwrap().retain(|tx| tx.send(event.clone()).is_ok())
|
||||
}
|
||||
|
||||
/// Adds reorg to the queue that can be consumed with
|
||||
/// [`TestCanonStateSubscriptions::subscribe_to_canonical_state`]
|
||||
pub fn add_next_reorg(&self, old: Arc<Chain>, new: Arc<Chain>) {
|
||||
let event = CanonStateNotification::Reorg { old, new };
|
||||
self.canon_notif_tx.lock().as_mut().unwrap().retain(|tx| tx.send(event.clone()).is_ok())
|
||||
}
|
||||
}
|
||||
|
||||
impl CanonStateSubscriptions for TestCanonStateSubscriptions {
|
||||
fn subscribe_to_canonical_state(&self) -> CanonStateNotifications {
|
||||
let (canon_notif_tx, canon_notif_rx) = broadcast::channel(100);
|
||||
self.canon_notif_tx.lock().as_mut().unwrap().push(canon_notif_tx);
|
||||
|
||||
canon_notif_rx
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user