fix: set finalized & safe block information on startup (#9898)

Co-authored-by: Danyal Prout <me@dany.al>
This commit is contained in:
joshieDo
2024-07-30 13:06:57 +01:00
committed by GitHub
parent e3d375a8f3
commit 624f5d5614
7 changed files with 73 additions and 53 deletions

View File

@ -17,10 +17,12 @@ pub struct ChainInfoTracker {
} }
impl ChainInfoTracker { impl ChainInfoTracker {
/// Create a new chain info container for the given canonical head. /// Create a new chain info container for the given canonical head and finalized header if it
pub fn new(head: SealedHeader) -> Self { /// exists.
let (finalized_block, _) = watch::channel(None); pub fn new(head: SealedHeader, finalized: Option<SealedHeader>) -> Self {
let (safe_block, _) = watch::channel(None); let (finalized_block, _) = watch::channel(finalized.clone());
let (safe_block, _) = watch::channel(finalized);
Self { Self {
inner: Arc::new(ChainInfoInner { inner: Arc::new(ChainInfoInner {
last_forkchoice_update: RwLock::new(None), last_forkchoice_update: RwLock::new(None),

View File

@ -112,11 +112,13 @@ pub struct CanonicalInMemoryState {
} }
impl CanonicalInMemoryState { impl CanonicalInMemoryState {
/// Create a new in memory state with the given blocks, numbers, and pending state. /// Create a new in memory state with the given blocks, numbers, pending state and finalized
/// header if it exists.
pub fn new( pub fn new(
blocks: HashMap<B256, Arc<BlockState>>, blocks: HashMap<B256, Arc<BlockState>>,
numbers: HashMap<u64, B256>, numbers: HashMap<u64, B256>,
pending: Option<BlockState>, pending: Option<BlockState>,
finalized: Option<SealedHeader>,
) -> Self { ) -> Self {
let in_memory_state = InMemoryState::new(blocks, numbers, pending); let in_memory_state = InMemoryState::new(blocks, numbers, pending);
let head_state = in_memory_state.head_state(); let head_state = in_memory_state.head_state();
@ -124,7 +126,7 @@ impl CanonicalInMemoryState {
Some(state) => state.block().block().header.clone(), Some(state) => state.block().block().header.clone(),
None => SealedHeader::default(), None => SealedHeader::default(),
}; };
let chain_info_tracker = ChainInfoTracker::new(header); let chain_info_tracker = ChainInfoTracker::new(header, finalized);
let (canon_state_notification_sender, _canon_state_notification_receiver) = let (canon_state_notification_sender, _canon_state_notification_receiver) =
broadcast::channel(CANON_STATE_NOTIFICATION_CHANNEL_SIZE); broadcast::channel(CANON_STATE_NOTIFICATION_CHANNEL_SIZE);
@ -137,9 +139,10 @@ impl CanonicalInMemoryState {
Self { inner: Arc::new(inner) } Self { inner: Arc::new(inner) }
} }
/// Create a new in memory state with the given local head. /// Create a new in memory state with the given local head and finalized header
pub fn with_head(head: SealedHeader) -> Self { /// if it exists.
let chain_info_tracker = ChainInfoTracker::new(head); pub fn with_head(head: SealedHeader, finalized: Option<SealedHeader>) -> Self {
let chain_info_tracker = ChainInfoTracker::new(head, finalized);
let in_memory_state = InMemoryState::default(); let in_memory_state = InMemoryState::default();
let (canon_state_notification_sender, _canon_state_notification_receiver) = let (canon_state_notification_sender, _canon_state_notification_receiver) =
broadcast::channel(CANON_STATE_NOTIFICATION_CHANNEL_SIZE); broadcast::channel(CANON_STATE_NOTIFICATION_CHANNEL_SIZE);
@ -889,7 +892,7 @@ mod tests {
#[test] #[test]
fn test_in_memory_state_chain_update() { fn test_in_memory_state_chain_update() {
let state = CanonicalInMemoryState::new(HashMap::new(), HashMap::new(), None); let state = CanonicalInMemoryState::new(HashMap::new(), HashMap::new(), None, None);
let block1 = get_executed_block_with_number(0, B256::random()); let block1 = get_executed_block_with_number(0, B256::random());
let block2 = get_executed_block_with_number(0, B256::random()); let block2 = get_executed_block_with_number(0, B256::random());
let chain = NewCanonicalChain::Commit { new: vec![block1.clone()] }; let chain = NewCanonicalChain::Commit { new: vec![block1.clone()] };
@ -925,7 +928,7 @@ mod tests {
numbers.insert(2, block2.block().hash()); numbers.insert(2, block2.block().hash());
numbers.insert(3, block3.block().hash()); numbers.insert(3, block3.block().hash());
let canonical_state = CanonicalInMemoryState::new(blocks, numbers, None); let canonical_state = CanonicalInMemoryState::new(blocks, numbers, None, None);
let historical: StateProviderBox = Box::new(MockStateProvider); let historical: StateProviderBox = Box::new(MockStateProvider);

View File

@ -399,9 +399,10 @@ where
) )
.expect("failed to create tree"), .expect("failed to create tree"),
)); ));
let latest = self.base_config.chain_spec.genesis_header().seal_slow(); let genesis_block = self.base_config.chain_spec.genesis_header().seal_slow();
let blockchain_provider = let blockchain_provider =
BlockchainProvider::with_latest(provider_factory.clone(), tree, latest); BlockchainProvider::with_blocks(provider_factory.clone(), tree, genesis_block, None);
let pruner = Pruner::<_, ProviderFactory<_>>::new( let pruner = Pruner::<_, ProviderFactory<_>>::new(
provider_factory.clone(), provider_factory.clone(),

View File

@ -1736,7 +1736,7 @@ mod tests {
let header = chain_spec.genesis_header().seal_slow(); let header = chain_spec.genesis_header().seal_slow();
let engine_api_tree_state = EngineApiTreeState::new(10, 10, header.num_hash()); let engine_api_tree_state = EngineApiTreeState::new(10, 10, header.num_hash());
let canonical_in_memory_state = CanonicalInMemoryState::with_head(header); let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None);
let (to_payload_service, payload_command_rx) = unbounded_channel(); let (to_payload_service, payload_command_rx) = unbounded_channel();
let payload_builder = PayloadBuilderHandle::new(to_payload_service); let payload_builder = PayloadBuilderHandle::new(to_payload_service);
@ -1797,7 +1797,7 @@ mod tests {
let last_executed_block = blocks.last().unwrap().clone(); let last_executed_block = blocks.last().unwrap().clone();
let pending = Some(BlockState::new(last_executed_block)); let pending = Some(BlockState::new(last_executed_block));
self.tree.canonical_in_memory_state = self.tree.canonical_in_memory_state =
CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending); CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None);
self.blocks = blocks; self.blocks = blocks;
self self

View File

@ -159,7 +159,8 @@ mod tests {
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
let blockchain_db = let blockchain_db =
BlockchainProvider2::with_latest(provider_factory.clone(), SealedHeader::default()); BlockchainProvider2::with_latest(provider_factory.clone(), SealedHeader::default())
.unwrap();
let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs); let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
let pruner = let pruner =

View File

@ -2,10 +2,10 @@ use crate::{
providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
BlockReader, BlockReaderIdExt, BlockSource, CanonChainTracker, CanonStateNotifications, BlockReader, BlockReaderIdExt, BlockSource, CanonChainTracker, CanonStateNotifications,
CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory,
DatabaseProviderRO, EvmEnvProvider, HeaderProvider, ProviderError, ProviderFactory, DatabaseProviderRO, EvmEnvProvider, FinalizedBlockReader, HeaderProvider, ProviderError,
PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider, ProviderFactory, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
StageCheckpointReader, StateProviderBox, StateProviderFactory, StaticFileProviderFactory, RequestsProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory,
TransactionVariant, TransactionsProvider, WithdrawalsProvider, StaticFileProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider,
}; };
use alloy_rpc_types_engine::ForkchoiceState; use alloy_rpc_types_engine::ForkchoiceState;
use reth_chain_state::CanonicalInMemoryState; use reth_chain_state::CanonicalInMemoryState;
@ -55,14 +55,6 @@ impl<DB> Clone for BlockchainProvider2<DB> {
} }
} }
impl<DB> BlockchainProvider2<DB> {
/// Create new provider instance that wraps the database and the blockchain tree, using the
/// provided latest header to initialize the chain info tracker.
pub fn with_latest(database: ProviderFactory<DB>, latest: SealedHeader) -> Self {
Self { database, canonical_in_memory_state: CanonicalInMemoryState::with_head(latest) }
}
}
impl<DB> BlockchainProvider2<DB> impl<DB> BlockchainProvider2<DB>
where where
DB: Database, DB: Database,
@ -75,12 +67,30 @@ where
match provider.header_by_number(best.best_number)? { match provider.header_by_number(best.best_number)? {
Some(header) => { Some(header) => {
drop(provider); drop(provider);
Ok(Self::with_latest(database, header.seal(best.best_hash))) Ok(Self::with_latest(database, header.seal(best.best_hash))?)
} }
None => Err(ProviderError::HeaderNotFound(best.best_number.into())), None => Err(ProviderError::HeaderNotFound(best.best_number.into())),
} }
} }
/// Create new provider instance that wraps the database and the blockchain tree, using the
/// provided latest header to initialize the chain info tracker.
///
/// This returns a `ProviderResult` since it tries the retrieve the last finalized header from
/// `database`.
pub fn with_latest(
database: ProviderFactory<DB>,
latest: SealedHeader,
) -> ProviderResult<Self> {
let provider = database.provider()?;
let finalized_block_number = provider.last_finalized_block_number()?;
let finalized_header = provider.sealed_header(finalized_block_number)?;
Ok(Self {
database,
canonical_in_memory_state: CanonicalInMemoryState::with_head(latest, finalized_header),
})
}
/// Gets a clone of `canonical_in_memory_state`. /// Gets a clone of `canonical_in_memory_state`.
pub fn canonical_in_memory_state(&self) -> CanonicalInMemoryState { pub fn canonical_in_memory_state(&self) -> CanonicalInMemoryState {
self.canonical_in_memory_state.clone() self.canonical_in_memory_state.clone()

View File

@ -1,18 +1,18 @@
use crate::{ use crate::{
AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt,
BlockSource, BlockchainTreePendingStateProvider, CanonChainTracker, ChainSpecProvider, BlockSource, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications,
ChangeSetReader, DatabaseProviderFactory, EvmEnvProvider, FullExecutionDataProvider, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory,
HeaderProvider, ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, EvmEnvProvider, FinalizedBlockReader, FullExecutionDataProvider, HeaderProvider, ProviderError,
RequestsProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider,
StaticFileProviderFactory, TransactionVariant, TransactionsProvider, TreeViewer, StageCheckpointReader, StateProviderBox, StateProviderFactory, StaticFileProviderFactory,
WithdrawalsProvider, TransactionVariant, TransactionsProvider, TreeViewer, WithdrawalsProvider,
}; };
use reth_blockchain_tree_api::{ use reth_blockchain_tree_api::{
error::{CanonicalError, InsertBlockError}, error::{CanonicalError, InsertBlockError},
BlockValidationKind, BlockchainTreeEngine, BlockchainTreeViewer, CanonicalOutcome, BlockValidationKind, BlockchainTreeEngine, BlockchainTreeViewer, CanonicalOutcome,
InsertPayloadOk, InsertPayloadOk,
}; };
use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions, ChainInfoTracker}; use reth_chain_state::ChainInfoTracker;
use reth_chainspec::{ChainInfo, ChainSpec}; use reth_chainspec::{ChainInfo, ChainSpec};
use reth_db_api::{ use reth_db_api::{
database::Database, database::Database,
@ -88,16 +88,6 @@ impl<DB> Clone for BlockchainProvider<DB> {
} }
impl<DB> BlockchainProvider<DB> { impl<DB> BlockchainProvider<DB> {
/// Create new provider instance that wraps the database and the blockchain tree, using the
/// provided latest header to initialize the chain info tracker.
pub fn with_latest(
database: ProviderFactory<DB>,
tree: Arc<dyn TreeViewer>,
latest: SealedHeader,
) -> Self {
Self { database, tree, chain_info: ChainInfoTracker::new(latest) }
}
/// Sets the treeviewer for the provider. /// Sets the treeviewer for the provider.
#[doc(hidden)] #[doc(hidden)]
pub fn with_tree(mut self, tree: Arc<dyn TreeViewer>) -> Self { pub fn with_tree(mut self, tree: Arc<dyn TreeViewer>) -> Self {
@ -110,18 +100,31 @@ impl<DB> BlockchainProvider<DB>
where where
DB: Database, DB: Database,
{ {
/// Create new provider instance that wraps the database and the blockchain tree, using the
/// provided latest header to initialize the chain info tracker, alongside the finalized header
/// if it exists.
pub fn with_blocks(
database: ProviderFactory<DB>,
tree: Arc<dyn TreeViewer>,
latest: SealedHeader,
finalized: Option<SealedHeader>,
) -> Self {
Self { database, tree, chain_info: ChainInfoTracker::new(latest, finalized) }
}
/// Create a new provider using only the database and the tree, fetching the latest header from /// Create a new provider using only the database and the tree, fetching the latest header from
/// the database to initialize the provider. /// the database to initialize the provider.
pub fn new(database: ProviderFactory<DB>, tree: Arc<dyn TreeViewer>) -> ProviderResult<Self> { pub fn new(database: ProviderFactory<DB>, tree: Arc<dyn TreeViewer>) -> ProviderResult<Self> {
let provider = database.provider()?; let provider = database.provider()?;
let best: ChainInfo = provider.chain_info()?; let best: ChainInfo = provider.chain_info()?;
match provider.header_by_number(best.best_number)? { let latest_header = provider
Some(header) => { .header_by_number(best.best_number)?
drop(provider); .ok_or(ProviderError::HeaderNotFound(best.best_number.into()))?;
Ok(Self::with_latest(database, tree, header.seal(best.best_hash)))
} let finalized_block_number = provider.last_finalized_block_number()?;
None => Err(ProviderError::HeaderNotFound(best.best_number.into())), let finalized_header = provider.sealed_header(finalized_block_number)?;
}
Ok(Self::with_blocks(database, tree, latest_header.seal(best.best_hash), finalized_header))
} }
/// Ensures that the given block number is canonical (synced) /// Ensures that the given block number is canonical (synced)