chore: make CanonicalInMemoryState generic over sealed header and block (#12835)

This commit is contained in:
Jun Song
2024-11-25 21:54:45 +09:00
committed by GitHub
parent dcaa06a01a
commit caac226c73
7 changed files with 92 additions and 60 deletions

1
Cargo.lock generated
View File

@ -6614,6 +6614,7 @@ dependencies = [
"reth-execution-types",
"reth-metrics",
"reth-primitives",
"reth-primitives-traits",
"reth-storage-api",
"reth-testing-utils",
"reth-trie",

View File

@ -18,6 +18,7 @@ reth-errors.workspace = true
reth-execution-types.workspace = true
reth-metrics.workspace = true
reth-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-storage-api.workspace = true
reth-trie.workspace = true
@ -56,12 +57,13 @@ revm.workspace = true
[features]
test-utils = [
"alloy-signer",
"alloy-signer-local",
"rand",
"revm",
"reth-chainspec/test-utils",
"reth-primitives/test-utils",
"reth-trie/test-utils",
"revm?/test-utils",
"alloy-signer",
"alloy-signer-local",
"rand",
"revm",
"reth-chainspec/test-utils",
"reth-primitives/test-utils",
"reth-trie/test-utils",
"revm?/test-utils",
"reth-primitives-traits/test-utils"
]

View File

@ -1,8 +1,9 @@
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumHash;
use alloy_primitives::BlockNumber;
use parking_lot::RwLock;
use reth_chainspec::ChainInfo;
use reth_primitives::SealedHeader;
use reth_primitives::{NodePrimitives, SealedHeader};
use std::{
sync::{
atomic::{AtomicU64, Ordering},
@ -14,17 +15,21 @@ use tokio::sync::watch;
/// Tracks the chain info: canonical head, safe block, finalized block.
#[derive(Debug, Clone)]
pub struct ChainInfoTracker {
inner: Arc<ChainInfoInner>,
pub struct ChainInfoTracker<N: NodePrimitives> {
inner: Arc<ChainInfoInner<N>>,
}
impl ChainInfoTracker {
impl<N> ChainInfoTracker<N>
where
N: NodePrimitives,
N::BlockHeader: BlockHeader,
{
/// Create a new chain info container for the given canonical head and finalized header if it
/// exists.
pub fn new(
head: SealedHeader,
finalized: Option<SealedHeader>,
safe: Option<SealedHeader>,
head: SealedHeader<N::BlockHeader>,
finalized: Option<SealedHeader<N::BlockHeader>>,
safe: Option<SealedHeader<N::BlockHeader>>,
) -> Self {
let (finalized_block, _) = watch::channel(finalized);
let (safe_block, _) = watch::channel(safe);
@ -33,7 +38,7 @@ impl ChainInfoTracker {
inner: Arc::new(ChainInfoInner {
last_forkchoice_update: RwLock::new(None),
last_transition_configuration_exchange: RwLock::new(None),
canonical_head_number: AtomicU64::new(head.number),
canonical_head_number: AtomicU64::new(head.number()),
canonical_head: RwLock::new(head),
safe_block,
finalized_block,
@ -44,7 +49,7 @@ impl ChainInfoTracker {
/// Returns the [`ChainInfo`] for the canonical head.
pub fn chain_info(&self) -> ChainInfo {
let inner = self.inner.canonical_head.read();
ChainInfo { best_hash: inner.hash(), best_number: inner.number }
ChainInfo { best_hash: inner.hash(), best_number: inner.number() }
}
/// Update the timestamp when we received a forkchoice update.
@ -68,17 +73,17 @@ impl ChainInfoTracker {
}
/// Returns the canonical head of the chain.
pub fn get_canonical_head(&self) -> SealedHeader {
pub fn get_canonical_head(&self) -> SealedHeader<N::BlockHeader> {
self.inner.canonical_head.read().clone()
}
/// Returns the safe header of the chain.
pub fn get_safe_header(&self) -> Option<SealedHeader> {
pub fn get_safe_header(&self) -> Option<SealedHeader<N::BlockHeader>> {
self.inner.safe_block.borrow().clone()
}
/// Returns the finalized header of the chain.
pub fn get_finalized_header(&self) -> Option<SealedHeader> {
pub fn get_finalized_header(&self) -> Option<SealedHeader<N::BlockHeader>> {
self.inner.finalized_block.borrow().clone()
}
@ -104,8 +109,8 @@ impl ChainInfoTracker {
}
/// Sets the canonical head of the chain.
pub fn set_canonical_head(&self, header: SealedHeader) {
let number = header.number;
pub fn set_canonical_head(&self, header: SealedHeader<N::BlockHeader>) {
let number = header.number();
*self.inner.canonical_head.write() = header;
// also update the atomic number.
@ -113,7 +118,7 @@ impl ChainInfoTracker {
}
/// Sets the safe header of the chain.
pub fn set_safe(&self, header: SealedHeader) {
pub fn set_safe(&self, header: SealedHeader<N::BlockHeader>) {
self.inner.safe_block.send_if_modified(|current_header| {
if current_header.as_ref().map(SealedHeader::hash) != Some(header.hash()) {
let _ = current_header.replace(header);
@ -125,7 +130,7 @@ impl ChainInfoTracker {
}
/// Sets the finalized header of the chain.
pub fn set_finalized(&self, header: SealedHeader) {
pub fn set_finalized(&self, header: SealedHeader<N::BlockHeader>) {
self.inner.finalized_block.send_if_modified(|current_header| {
if current_header.as_ref().map(SealedHeader::hash) != Some(header.hash()) {
let _ = current_header.replace(header);
@ -137,19 +142,21 @@ impl ChainInfoTracker {
}
/// Subscribe to the finalized block.
pub fn subscribe_finalized_block(&self) -> watch::Receiver<Option<SealedHeader>> {
pub fn subscribe_finalized_block(
&self,
) -> watch::Receiver<Option<SealedHeader<N::BlockHeader>>> {
self.inner.finalized_block.subscribe()
}
/// Subscribe to the safe block.
pub fn subscribe_safe_block(&self) -> watch::Receiver<Option<SealedHeader>> {
pub fn subscribe_safe_block(&self) -> watch::Receiver<Option<SealedHeader<N::BlockHeader>>> {
self.inner.safe_block.subscribe()
}
}
/// Container type for all chain info fields
#[derive(Debug)]
struct ChainInfoInner {
struct ChainInfoInner<N: NodePrimitives = reth_primitives::EthPrimitives> {
/// Timestamp when we received the last fork choice update.
///
/// This is mainly used to track if we're connected to a beacon node.
@ -161,16 +168,17 @@ struct ChainInfoInner {
/// Tracks the number of the `canonical_head`.
canonical_head_number: AtomicU64,
/// The canonical head of the chain.
canonical_head: RwLock<SealedHeader>,
canonical_head: RwLock<SealedHeader<N::BlockHeader>>,
/// The block that the beacon node considers safe.
safe_block: watch::Sender<Option<SealedHeader>>,
safe_block: watch::Sender<Option<SealedHeader<N::BlockHeader>>>,
/// The block that the beacon node considers finalized.
finalized_block: watch::Sender<Option<SealedHeader>>,
finalized_block: watch::Sender<Option<SealedHeader<N::BlockHeader>>>,
}
#[cfg(test)]
mod tests {
use super::*;
use reth_primitives::EthPrimitives;
use reth_testing_utils::{generators, generators::random_header};
#[test]
@ -180,7 +188,8 @@ mod tests {
let header = random_header(&mut rng, 10, None);
// Create a new chain info tracker with the header
let tracker = ChainInfoTracker::new(header.clone(), None, None);
let tracker: ChainInfoTracker<EthPrimitives> =
ChainInfoTracker::new(header.clone(), None, None);
// Fetch the chain information from the tracker
let chain_info = tracker.chain_info();
@ -197,7 +206,7 @@ mod tests {
let header = random_header(&mut rng, 10, None);
// Create a new chain info tracker with the header
let tracker = ChainInfoTracker::new(header, None, None);
let tracker: ChainInfoTracker<EthPrimitives> = ChainInfoTracker::new(header, None, None);
// Assert that there has been no forkchoice update yet (the timestamp is None)
assert!(tracker.last_forkchoice_update_received_at().is_none());
@ -216,7 +225,7 @@ mod tests {
let header = random_header(&mut rng, 10, None);
// Create a new chain info tracker with the header
let tracker = ChainInfoTracker::new(header, None, None);
let tracker: ChainInfoTracker<EthPrimitives> = ChainInfoTracker::new(header, None, None);
// Assert that there has been no transition configuration exchange yet (the timestamp is
// None)
@ -239,7 +248,7 @@ mod tests {
let header2 = random_header(&mut rng, 20, None);
// Create a new chain info tracker with the first header
let tracker = ChainInfoTracker::new(header1, None, None);
let tracker: ChainInfoTracker<EthPrimitives> = ChainInfoTracker::new(header1, None, None);
// Set the second header as the canonical head of the tracker
tracker.set_canonical_head(header2.clone());
@ -260,7 +269,7 @@ mod tests {
let header2 = random_header(&mut rng, 20, None);
// Create a new chain info tracker with the first header (header1)
let tracker = ChainInfoTracker::new(header1, None, None);
let tracker: ChainInfoTracker<EthPrimitives> = ChainInfoTracker::new(header1, None, None);
// Call the set_safe method with the second header (header2)
tracker.set_safe(header2.clone());
@ -306,7 +315,7 @@ mod tests {
let header3 = random_header(&mut rng, 30, None);
// Create a new chain info tracker with the first header
let tracker = ChainInfoTracker::new(header1, None, None);
let tracker: ChainInfoTracker<EthPrimitives> = ChainInfoTracker::new(header1, None, None);
// Initial state: finalize header should be None
assert!(tracker.get_finalized_header().is_none());
@ -343,7 +352,7 @@ mod tests {
let finalized_header = random_header(&mut rng, 10, None);
// Create a new chain info tracker with the finalized header
let tracker =
let tracker: ChainInfoTracker<EthPrimitives> =
ChainInfoTracker::new(finalized_header.clone(), Some(finalized_header.clone()), None);
// Assert that the BlockNumHash returned matches the finalized header
@ -357,7 +366,8 @@ mod tests {
let safe_header = random_header(&mut rng, 10, None);
// Create a new chain info tracker with the safe header
let tracker = ChainInfoTracker::new(safe_header.clone(), None, None);
let tracker: ChainInfoTracker<EthPrimitives> =
ChainInfoTracker::new(safe_header.clone(), None, None);
tracker.set_safe(safe_header.clone());
// Assert that the BlockNumHash returned matches the safe header

View File

@ -134,7 +134,7 @@ impl<N: NodePrimitives> InMemoryState<N> {
pub(crate) struct CanonicalInMemoryStateInner<N: NodePrimitives> {
/// Tracks certain chain information, such as the canonical head, safe head, and finalized
/// head.
pub(crate) chain_info_tracker: ChainInfoTracker,
pub(crate) chain_info_tracker: ChainInfoTracker<N>,
/// Tracks blocks at the tip of the chain that have not been persisted to disk yet.
pub(crate) in_memory_state: InMemoryState<N>,
/// A broadcast stream that emits events when the canonical chain is updated.
@ -158,6 +158,11 @@ impl<N: NodePrimitives> CanonicalInMemoryStateInner<N> {
}
}
type PendingBlockAndReceipts<N> = (
SealedBlock<reth_primitives_traits::HeaderTy<N>, reth_primitives_traits::BodyTy<N>>,
Vec<reth_primitives_traits::ReceiptTy<N>>,
);
/// This type is responsible for providing the blocks, receipts, and state for
/// all canonical blocks not on disk yet and keeps track of the block range that
/// is in memory.
@ -166,15 +171,21 @@ pub struct CanonicalInMemoryState<N: NodePrimitives = reth_primitives::EthPrimit
pub(crate) inner: Arc<CanonicalInMemoryStateInner<N>>,
}
impl<N: NodePrimitives> CanonicalInMemoryState<N> {
impl<N> CanonicalInMemoryState<N>
where
N: NodePrimitives<
BlockHeader = alloy_consensus::Header,
BlockBody = reth_primitives::BlockBody,
>,
{
/// Create a new in-memory state with the given blocks, numbers, pending state, and optional
/// finalized header.
pub fn new(
blocks: HashMap<B256, Arc<BlockState<N>>>,
numbers: BTreeMap<u64, B256>,
pending: Option<BlockState<N>>,
finalized: Option<SealedHeader>,
safe: Option<SealedHeader>,
finalized: Option<SealedHeader<N::BlockHeader>>,
safe: Option<SealedHeader<N::BlockHeader>>,
) -> Self {
let in_memory_state = InMemoryState::new(blocks, numbers, pending);
let header = in_memory_state
@ -201,9 +212,9 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
/// Create a new in memory state with the given local head and finalized header
/// if it exists.
pub fn with_head(
head: SealedHeader,
finalized: Option<SealedHeader>,
safe: Option<SealedHeader>,
head: SealedHeader<N::BlockHeader>,
finalized: Option<SealedHeader<N::BlockHeader>>,
safe: Option<SealedHeader<N::BlockHeader>>,
) -> Self {
let chain_info_tracker = ChainInfoTracker::new(head, finalized, safe);
let in_memory_state = InMemoryState::default();
@ -224,7 +235,7 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
}
/// Returns the header corresponding to the given hash.
pub fn header_by_hash(&self, hash: B256) -> Option<SealedHeader> {
pub fn header_by_hash(&self, hash: B256) -> Option<SealedHeader<N::BlockHeader>> {
self.state_by_hash(hash).map(|block| block.block_ref().block.header.clone())
}
@ -427,37 +438,37 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
}
/// Canonical head setter.
pub fn set_canonical_head(&self, header: SealedHeader) {
pub fn set_canonical_head(&self, header: SealedHeader<N::BlockHeader>) {
self.inner.chain_info_tracker.set_canonical_head(header);
}
/// Safe head setter.
pub fn set_safe(&self, header: SealedHeader) {
pub fn set_safe(&self, header: SealedHeader<N::BlockHeader>) {
self.inner.chain_info_tracker.set_safe(header);
}
/// Finalized head setter.
pub fn set_finalized(&self, header: SealedHeader) {
pub fn set_finalized(&self, header: SealedHeader<N::BlockHeader>) {
self.inner.chain_info_tracker.set_finalized(header);
}
/// Canonical head getter.
pub fn get_canonical_head(&self) -> SealedHeader {
pub fn get_canonical_head(&self) -> SealedHeader<N::BlockHeader> {
self.inner.chain_info_tracker.get_canonical_head()
}
/// Finalized header getter.
pub fn get_finalized_header(&self) -> Option<SealedHeader> {
pub fn get_finalized_header(&self) -> Option<SealedHeader<N::BlockHeader>> {
self.inner.chain_info_tracker.get_finalized_header()
}
/// Safe header getter.
pub fn get_safe_header(&self) -> Option<SealedHeader> {
pub fn get_safe_header(&self) -> Option<SealedHeader<N::BlockHeader>> {
self.inner.chain_info_tracker.get_safe_header()
}
/// Returns the `SealedHeader` corresponding to the pending state.
pub fn pending_sealed_header(&self) -> Option<SealedHeader> {
pub fn pending_sealed_header(&self) -> Option<SealedHeader<N::BlockHeader>> {
self.pending_state().map(|h| h.block_ref().block().header.clone())
}
@ -467,7 +478,7 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
}
/// Returns the `SealedBlock` corresponding to the pending state.
pub fn pending_block(&self) -> Option<SealedBlock> {
pub fn pending_block(&self) -> Option<SealedBlock<N::BlockHeader, N::BlockBody>> {
self.pending_state().map(|block_state| block_state.block_ref().block().clone())
}
@ -479,7 +490,7 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
/// Returns a tuple with the `SealedBlock` corresponding to the pending
/// state and a vector of its `Receipt`s.
pub fn pending_block_and_receipts(&self) -> Option<(SealedBlock, Vec<N::Receipt>)> {
pub fn pending_block_and_receipts(&self) -> Option<PendingBlockAndReceipts<N>> {
self.pending_state().map(|block_state| {
(block_state.block_ref().block().clone(), block_state.executed_block_receipts())
})
@ -491,12 +502,14 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
}
/// Subscribe to new safe block events.
pub fn subscribe_safe_block(&self) -> watch::Receiver<Option<SealedHeader>> {
pub fn subscribe_safe_block(&self) -> watch::Receiver<Option<SealedHeader<N::BlockHeader>>> {
self.inner.chain_info_tracker.subscribe_safe_block()
}
/// Subscribe to new finalized block events.
pub fn subscribe_finalized_block(&self) -> watch::Receiver<Option<SealedHeader>> {
pub fn subscribe_finalized_block(
&self,
) -> watch::Receiver<Option<SealedHeader<N::BlockHeader>>> {
self.inner.chain_info_tracker.subscribe_finalized_block()
}

View File

@ -78,7 +78,7 @@ pub use size::InMemorySize;
/// Node traits
pub mod node;
pub use node::{FullNodePrimitives, NodePrimitives, ReceiptTy};
pub use node::{BodyTy, FullNodePrimitives, HeaderTy, NodePrimitives, ReceiptTy};
/// Helper trait that requires arbitrary implementation if the feature is enabled.
#[cfg(any(feature = "test-utils", feature = "arbitrary"))]

View File

@ -109,5 +109,11 @@ impl<T> FullNodePrimitives for T where
{
}
/// Helper adapter type for accessing [`NodePrimitives`] receipt type.
/// Helper adapter type for accessing [`NodePrimitives`] block header types.
pub type HeaderTy<N> = <N as NodePrimitives>::BlockHeader;
/// Helper adapter type for accessing [`NodePrimitives`] block body types.
pub type BodyTy<N> = <N as NodePrimitives>::BlockBody;
/// Helper adapter type for accessing [`NodePrimitives`] receipt types.
pub type ReceiptTy<N> = <N as NodePrimitives>::Receipt;

View File

@ -117,7 +117,7 @@ pub struct BlockchainProvider<N: NodeTypesWithDB> {
/// The blockchain tree instance.
tree: Arc<dyn TreeViewer>,
/// Tracks the chain info wrt forkchoice updates
chain_info: ChainInfoTracker,
chain_info: ChainInfoTracker<reth_primitives::EthPrimitives>,
}
impl<N: ProviderNodeTypes> Clone for BlockchainProvider<N> {