feat: NodePrimitivesProvider (#12855)

This commit is contained in:
Arsenii Kulikov
2024-11-26 16:06:55 +04:00
committed by GitHub
parent b34fb7883a
commit 26fc701814
40 changed files with 239 additions and 148 deletions

View File

@ -35,7 +35,6 @@ tokio-stream = { workspace = true, features = ["sync"] }
tracing.workspace = true
# misc
auto_impl.workspace = true
derive_more.workspace = true
metrics.workspace = true
parking_lot.workspace = true

View File

@ -1,9 +1,9 @@
//! Canonical chain state notification trait and types.
use auto_impl::auto_impl;
use derive_more::{Deref, DerefMut};
use reth_execution_types::{BlockReceipts, Chain};
use reth_primitives::{NodePrimitives, SealedBlockWithSenders, SealedHeader};
use reth_storage_api::NodePrimitivesProvider;
use std::{
pin::Pin,
sync::Arc,
@ -25,21 +25,30 @@ pub type CanonStateNotificationSender<N = reth_primitives::EthPrimitives> =
broadcast::Sender<CanonStateNotification<N>>;
/// A type that allows to register chain related event subscriptions.
#[auto_impl(&, Arc)]
pub trait CanonStateSubscriptions: Send + Sync {
pub trait CanonStateSubscriptions: NodePrimitivesProvider + Send + Sync {
/// Get notified when a new canonical chain was imported.
///
/// A canonical chain be one or more blocks, a reorg or a revert.
fn subscribe_to_canonical_state(&self) -> CanonStateNotifications;
fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives>;
/// Convenience method to get a stream of [`CanonStateNotification`].
fn canonical_state_stream(&self) -> CanonStateNotificationStream {
fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
CanonStateNotificationStream {
st: BroadcastStream::new(self.subscribe_to_canonical_state()),
}
}
}
impl<T: CanonStateSubscriptions> CanonStateSubscriptions for &T {
fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives> {
(*self).subscribe_to_canonical_state()
}
fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
(*self).canonical_state_stream()
}
}
/// A Stream of [`CanonStateNotification`].
#[derive(Debug)]
#[pin_project::pin_project]

View File

@ -14,9 +14,11 @@ use reth_chainspec::{ChainSpec, EthereumHardfork, MIN_TRANSACTION_GAS};
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_primitives::{
proofs::{calculate_receipt_root, calculate_transaction_root, calculate_withdrawals_root},
BlockBody, NodePrimitives, Receipt, Receipts, SealedBlock, SealedBlockWithSenders,
SealedHeader, Transaction, TransactionSigned, TransactionSignedEcRecovered,
BlockBody, EthPrimitives, NodePrimitives, Receipt, Receipts, SealedBlock,
SealedBlockWithSenders, SealedHeader, Transaction, TransactionSigned,
TransactionSignedEcRecovered,
};
use reth_storage_api::NodePrimitivesProvider;
use reth_trie::{root::state_root_unhashed, updates::TrieUpdates, HashedPostState};
use revm::{db::BundleState, primitives::AccountInfo};
use std::{
@ -314,6 +316,10 @@ impl TestCanonStateSubscriptions {
}
}
impl NodePrimitivesProvider for TestCanonStateSubscriptions {
type Primitives = EthPrimitives;
}
impl CanonStateSubscriptions for TestCanonStateSubscriptions {
/// Sets up a broadcast channel with a buffer size of 100.
fn subscribe_to_canonical_state(&self) -> CanonStateNotifications {