mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 19:09:54 +00:00
feat: store safe block num as well (#11648)
This commit is contained in:
@ -7,7 +7,7 @@ use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
|
||||
use reth_node_types::NodeTypesWithDB;
|
||||
use reth_primitives::StaticFileSegment;
|
||||
use reth_provider::{
|
||||
providers::ProviderNodeTypes, FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory,
|
||||
providers::ProviderNodeTypes, ChainStateBlockReader, ChainStateBlockWriter, ProviderFactory,
|
||||
StaticFileProviderFactory, StatsReader,
|
||||
};
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
|
||||
@ -21,9 +21,13 @@ pub struct ChainInfoTracker {
|
||||
impl ChainInfoTracker {
|
||||
/// Create a new chain info container for the given canonical head and finalized header if it
|
||||
/// exists.
|
||||
pub fn new(head: SealedHeader, finalized: Option<SealedHeader>) -> Self {
|
||||
pub fn new(
|
||||
head: SealedHeader,
|
||||
finalized: Option<SealedHeader>,
|
||||
safe: Option<SealedHeader>,
|
||||
) -> Self {
|
||||
let (finalized_block, _) = watch::channel(finalized);
|
||||
let (safe_block, _) = watch::channel(None);
|
||||
let (safe_block, _) = watch::channel(safe);
|
||||
|
||||
Self {
|
||||
inner: Arc::new(ChainInfoInner {
|
||||
|
||||
@ -173,12 +173,13 @@ impl CanonicalInMemoryState {
|
||||
numbers: BTreeMap<u64, B256>,
|
||||
pending: Option<BlockState>,
|
||||
finalized: Option<SealedHeader>,
|
||||
safe: Option<SealedHeader>,
|
||||
) -> Self {
|
||||
let in_memory_state = InMemoryState::new(blocks, numbers, pending);
|
||||
let header = in_memory_state
|
||||
.head_state()
|
||||
.map_or_else(SealedHeader::default, |state| state.block_ref().block().header.clone());
|
||||
let chain_info_tracker = ChainInfoTracker::new(header, finalized);
|
||||
let chain_info_tracker = ChainInfoTracker::new(header, finalized, safe);
|
||||
let (canon_state_notification_sender, _) =
|
||||
broadcast::channel(CANON_STATE_NOTIFICATION_CHANNEL_SIZE);
|
||||
|
||||
@ -193,13 +194,17 @@ impl CanonicalInMemoryState {
|
||||
|
||||
/// Create an empty state.
|
||||
pub fn empty() -> Self {
|
||||
Self::new(HashMap::default(), BTreeMap::new(), None, None)
|
||||
Self::new(HashMap::default(), BTreeMap::new(), None, None, None)
|
||||
}
|
||||
|
||||
/// Create a new in memory state with the given local head and finalized header
|
||||
/// if it exists.
|
||||
pub fn with_head(head: SealedHeader, finalized: Option<SealedHeader>) -> Self {
|
||||
let chain_info_tracker = ChainInfoTracker::new(head, finalized);
|
||||
pub fn with_head(
|
||||
head: SealedHeader,
|
||||
finalized: Option<SealedHeader>,
|
||||
safe: Option<SealedHeader>,
|
||||
) -> Self {
|
||||
let chain_info_tracker = ChainInfoTracker::new(head, finalized, safe);
|
||||
let in_memory_state = InMemoryState::default();
|
||||
let (canon_state_notification_sender, _) =
|
||||
broadcast::channel(CANON_STATE_NOTIFICATION_CHANNEL_SIZE);
|
||||
@ -1255,7 +1260,7 @@ mod tests {
|
||||
numbers.insert(2, block2.block().hash());
|
||||
numbers.insert(3, block3.block().hash());
|
||||
|
||||
let canonical_state = CanonicalInMemoryState::new(blocks, numbers, None, None);
|
||||
let canonical_state = CanonicalInMemoryState::new(blocks, numbers, None, None, None);
|
||||
|
||||
let historical: StateProviderBox = Box::new(MockStateProvider);
|
||||
|
||||
@ -1297,7 +1302,7 @@ mod tests {
|
||||
let mut numbers = BTreeMap::new();
|
||||
numbers.insert(1, hash);
|
||||
|
||||
let state = CanonicalInMemoryState::new(blocks, numbers, None, None);
|
||||
let state = CanonicalInMemoryState::new(blocks, numbers, None, None, None);
|
||||
let chain: Vec<_> = state.canonical_chain().collect();
|
||||
|
||||
assert_eq!(chain.len(), 1);
|
||||
|
||||
@ -17,7 +17,7 @@ use reth_node_builder::{NodeTypesWithDB, NodeTypesWithEngine};
|
||||
use reth_node_core::args::NetworkArgs;
|
||||
use reth_provider::{
|
||||
providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, ChainSpecProvider,
|
||||
FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory, StaticFileProviderFactory,
|
||||
ChainStateBlockReader, ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory,
|
||||
};
|
||||
use reth_prune::PruneModes;
|
||||
use reth_stages::{
|
||||
|
||||
@ -398,8 +398,13 @@ where
|
||||
let (header, seal) = sealed.into_parts();
|
||||
let genesis_block = SealedHeader::new(header, seal);
|
||||
|
||||
let blockchain_provider =
|
||||
BlockchainProvider::with_blocks(provider_factory.clone(), tree, genesis_block, None);
|
||||
let blockchain_provider = BlockchainProvider::with_blocks(
|
||||
provider_factory.clone(),
|
||||
tree,
|
||||
genesis_block,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let pruner = Pruner::new_with_factory(
|
||||
provider_factory.clone(),
|
||||
|
||||
@ -4,7 +4,7 @@ use reth_chain_state::ExecutedBlock;
|
||||
use reth_errors::ProviderError;
|
||||
use reth_provider::{
|
||||
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader,
|
||||
DatabaseProviderFactory, FinalizedBlockWriter, ProviderFactory, StaticFileProviderFactory,
|
||||
ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory,
|
||||
};
|
||||
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
|
||||
use reth_stages_api::{MetricEvent, MetricEventsSender};
|
||||
@ -97,6 +97,11 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
|
||||
provider.save_finalized_block_number(finalized_block)?;
|
||||
provider.commit()?;
|
||||
}
|
||||
PersistenceAction::SaveSafeBlock(safe_block) => {
|
||||
let provider = self.provider.database_provider_rw()?;
|
||||
provider.save_safe_block_number(safe_block)?;
|
||||
provider.commit()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@ -176,6 +181,9 @@ pub enum PersistenceAction {
|
||||
|
||||
/// Update the persisted finalized block on disk
|
||||
SaveFinalizedBlock(u64),
|
||||
|
||||
/// Update the persisted safe block on disk
|
||||
SaveSafeBlock(u64),
|
||||
}
|
||||
|
||||
/// A handle to the persistence service
|
||||
@ -251,6 +259,14 @@ impl PersistenceHandle {
|
||||
self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
|
||||
}
|
||||
|
||||
/// Persists the finalized block number on disk.
|
||||
pub fn save_safe_block_number(
|
||||
&self,
|
||||
safe_block: u64,
|
||||
) -> Result<(), SendError<PersistenceAction>> {
|
||||
self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
|
||||
}
|
||||
|
||||
/// Tells the persistence service to remove blocks above a certain block number. The removed
|
||||
/// blocks are returned by the service.
|
||||
///
|
||||
|
||||
@ -2401,8 +2401,13 @@ where
|
||||
// if the safe block is not known, we can't update the safe block
|
||||
return Err(OnForkChoiceUpdated::invalid_state())
|
||||
}
|
||||
Ok(Some(finalized)) => {
|
||||
self.canonical_in_memory_state.set_safe(finalized);
|
||||
Ok(Some(safe)) => {
|
||||
if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
|
||||
// we're also persisting the safe block on disk so we can reload it on
|
||||
// restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
|
||||
let _ = self.persistence.save_safe_block_number(safe.number);
|
||||
self.canonical_in_memory_state.set_safe(safe);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!(target: "engine::tree", %err, "Failed to fetch safe block header");
|
||||
@ -2680,7 +2685,7 @@ mod tests {
|
||||
let (header, seal) = sealed.into_parts();
|
||||
let header = SealedHeader::new(header, seal);
|
||||
let engine_api_tree_state = EngineApiTreeState::new(10, 10, header.num_hash());
|
||||
let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None);
|
||||
let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);
|
||||
|
||||
let (to_payload_service, _payload_command_rx) = unbounded_channel();
|
||||
let payload_builder = PayloadBuilderHandle::new(to_payload_service);
|
||||
@ -2744,7 +2749,7 @@ mod tests {
|
||||
let last_executed_block = blocks.last().unwrap().clone();
|
||||
let pending = Some(BlockState::new(last_executed_block));
|
||||
self.tree.canonical_in_memory_state =
|
||||
CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None);
|
||||
CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None);
|
||||
|
||||
self.blocks = blocks.clone();
|
||||
self.persist_blocks(
|
||||
|
||||
@ -7,8 +7,8 @@ pub use event::*;
|
||||
use futures_util::Future;
|
||||
use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
|
||||
use reth_provider::{
|
||||
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory,
|
||||
FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory, StageCheckpointReader,
|
||||
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, ChainStateBlockReader,
|
||||
ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
|
||||
StageCheckpointWriter, StaticFileProviderFactory,
|
||||
};
|
||||
use reth_prune::PrunerBuilder;
|
||||
|
||||
@ -416,6 +416,8 @@ tables! {
|
||||
pub enum ChainStateKey {
|
||||
/// Last finalized block key
|
||||
LastFinalizedBlock,
|
||||
/// Last finalized block key
|
||||
LastSafeBlockBlock,
|
||||
}
|
||||
|
||||
impl Encode for ChainStateKey {
|
||||
@ -424,16 +426,17 @@ impl Encode for ChainStateKey {
|
||||
fn encode(self) -> Self::Encoded {
|
||||
match self {
|
||||
Self::LastFinalizedBlock => [0],
|
||||
Self::LastSafeBlockBlock => [1],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Decode for ChainStateKey {
|
||||
fn decode(value: &[u8]) -> Result<Self, reth_db_api::DatabaseError> {
|
||||
if value == [0] {
|
||||
Ok(Self::LastFinalizedBlock)
|
||||
} else {
|
||||
Err(reth_db_api::DatabaseError::Decode)
|
||||
match value {
|
||||
[0] => Ok(Self::LastFinalizedBlock),
|
||||
[1] => Ok(Self::LastSafeBlockBlock),
|
||||
_ => Err(reth_db_api::DatabaseError::Decode),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
use crate::{
|
||||
providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
|
||||
BlockReader, BlockReaderIdExt, BlockSource, CanonChainTracker, CanonStateNotifications,
|
||||
CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory,
|
||||
DatabaseProviderRO, EvmEnvProvider, FinalizedBlockReader, HeaderProvider, ProviderError,
|
||||
CanonStateSubscriptions, ChainSpecProvider, ChainStateBlockReader, ChangeSetReader,
|
||||
DatabaseProviderFactory, DatabaseProviderRO, EvmEnvProvider, HeaderProvider, ProviderError,
|
||||
ProviderFactory, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
|
||||
RequestsProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory, StateReader,
|
||||
StaticFileProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider,
|
||||
@ -93,9 +93,23 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
|
||||
.map(|num| provider.sealed_header(num))
|
||||
.transpose()?
|
||||
.flatten();
|
||||
let safe_header = provider
|
||||
.last_safe_block_number()?
|
||||
.or_else(|| {
|
||||
// for the purpose of this we can also use the finalized block if we don't have the
|
||||
// safe block
|
||||
provider.last_finalized_block_number().ok().flatten()
|
||||
})
|
||||
.map(|num| provider.sealed_header(num))
|
||||
.transpose()?
|
||||
.flatten();
|
||||
Ok(Self {
|
||||
database,
|
||||
canonical_in_memory_state: CanonicalInMemoryState::with_head(latest, finalized_header),
|
||||
canonical_in_memory_state: CanonicalInMemoryState::with_head(
|
||||
latest,
|
||||
finalized_header,
|
||||
safe_header,
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -7,13 +7,14 @@ use crate::{
|
||||
},
|
||||
writer::UnifiedStorageWriter,
|
||||
AccountReader, BlockExecutionReader, BlockExecutionWriter, BlockHashReader, BlockNumReader,
|
||||
BlockReader, BlockWriter, BundleStateInit, DBProvider, EvmEnvProvider, FinalizedBlockReader,
|
||||
FinalizedBlockWriter, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
|
||||
HistoricalStateProvider, HistoryWriter, LatestStateProvider, OriginalValuesKnown,
|
||||
ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RequestsProvider, RevertsInit,
|
||||
StageCheckpointReader, StateChangeWriter, StateProviderBox, StateReader, StateWriter,
|
||||
StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
|
||||
TransactionsProvider, TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
|
||||
BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
|
||||
DBProvider, EvmEnvProvider, HashingWriter, HeaderProvider, HeaderSyncGap,
|
||||
HeaderSyncGapProvider, HistoricalStateProvider, HistoryWriter, LatestStateProvider,
|
||||
OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter,
|
||||
RequestsProvider, RevertsInit, StageCheckpointReader, StateChangeWriter, StateProviderBox,
|
||||
StateReader, StateWriter, StaticFileProviderFactory, StatsReader, StorageReader,
|
||||
StorageTrieWriter, TransactionVariant, TransactionsProvider, TransactionsProviderExt,
|
||||
TrieWriter, WithdrawalsProvider,
|
||||
};
|
||||
use alloy_eips::BlockHashOrNumber;
|
||||
use alloy_primitives::{keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256};
|
||||
@ -3596,7 +3597,7 @@ impl<TX: DbTx, Spec: Send + Sync> StatsReader for DatabaseProvider<TX, Spec> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTx, Spec: Send + Sync> FinalizedBlockReader for DatabaseProvider<TX, Spec> {
|
||||
impl<TX: DbTx, Spec: Send + Sync> ChainStateBlockReader for DatabaseProvider<TX, Spec> {
|
||||
fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
|
||||
let mut finalized_blocks = self
|
||||
.tx
|
||||
@ -3608,14 +3609,32 @@ impl<TX: DbTx, Spec: Send + Sync> FinalizedBlockReader for DatabaseProvider<TX,
|
||||
let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
|
||||
Ok(last_finalized_block_number)
|
||||
}
|
||||
|
||||
fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
|
||||
let mut finalized_blocks = self
|
||||
.tx
|
||||
.cursor_read::<tables::ChainState>()?
|
||||
.walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
|
||||
.take(1)
|
||||
.collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
|
||||
|
||||
let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
|
||||
Ok(last_finalized_block_number)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut, Spec: Send + Sync> FinalizedBlockWriter for DatabaseProvider<TX, Spec> {
|
||||
impl<TX: DbTxMut, Spec: Send + Sync> ChainStateBlockWriter for DatabaseProvider<TX, Spec> {
|
||||
fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
|
||||
Ok(self
|
||||
.tx
|
||||
.put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
|
||||
}
|
||||
|
||||
fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
|
||||
Ok(self
|
||||
.tx
|
||||
.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTx + 'static, Spec: Send + Sync + 'static> DBProvider for DatabaseProvider<TX, Spec> {
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
use crate::{
|
||||
AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt,
|
||||
BlockSource, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications,
|
||||
CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory,
|
||||
EvmEnvProvider, FinalizedBlockReader, FullExecutionDataProvider, HeaderProvider, ProviderError,
|
||||
PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider,
|
||||
CanonStateSubscriptions, ChainSpecProvider, ChainStateBlockReader, ChangeSetReader,
|
||||
DatabaseProviderFactory, EvmEnvProvider, FullExecutionDataProvider, HeaderProvider,
|
||||
ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider,
|
||||
StageCheckpointReader, StateProviderBox, StateProviderFactory, StaticFileProviderFactory,
|
||||
TransactionVariant, TransactionsProvider, TreeViewer, WithdrawalsProvider,
|
||||
};
|
||||
@ -109,8 +109,9 @@ impl<N: ProviderNodeTypes> BlockchainProvider<N> {
|
||||
tree: Arc<dyn TreeViewer>,
|
||||
latest: SealedHeader,
|
||||
finalized: Option<SealedHeader>,
|
||||
safe: Option<SealedHeader>,
|
||||
) -> Self {
|
||||
Self { database, tree, chain_info: ChainInfoTracker::new(latest, finalized) }
|
||||
Self { database, tree, chain_info: ChainInfoTracker::new(latest, finalized, safe) }
|
||||
}
|
||||
|
||||
/// Create a new provider using only the database and the tree, fetching the latest header from
|
||||
@ -128,11 +129,18 @@ impl<N: ProviderNodeTypes> BlockchainProvider<N> {
|
||||
.transpose()?
|
||||
.flatten();
|
||||
|
||||
let safe_header = provider
|
||||
.last_safe_block_number()?
|
||||
.map(|num| provider.sealed_header(num))
|
||||
.transpose()?
|
||||
.flatten();
|
||||
|
||||
Ok(Self::with_blocks(
|
||||
database,
|
||||
tree,
|
||||
SealedHeader::new(latest_header, best.best_hash),
|
||||
finalized_header,
|
||||
safe_header,
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
@ -1,16 +1,23 @@
|
||||
use alloy_primitives::BlockNumber;
|
||||
use reth_errors::ProviderResult;
|
||||
|
||||
/// Functionality to read the last known finalized block from the database.
|
||||
pub trait FinalizedBlockReader: Send + Sync {
|
||||
/// Functionality to read the last known chain blocks from the database.
|
||||
pub trait ChainStateBlockReader: Send + Sync {
|
||||
/// Returns the last finalized block number.
|
||||
///
|
||||
/// If no finalized block has been written yet, this returns `None`.
|
||||
fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>>;
|
||||
/// Returns the last safe block number.
|
||||
///
|
||||
/// If no safe block has been written yet, this returns `None`.
|
||||
fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>>;
|
||||
}
|
||||
|
||||
/// Functionality to write the last known finalized block to the database.
|
||||
pub trait FinalizedBlockWriter: Send + Sync {
|
||||
/// Functionality to write the last known chain blocks to the database.
|
||||
pub trait ChainStateBlockWriter: Send + Sync {
|
||||
/// Saves the given finalized block number in the DB.
|
||||
fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()>;
|
||||
|
||||
/// Saves the given safe block number in the DB.
|
||||
fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()>;
|
||||
}
|
||||
|
||||
@ -42,4 +42,4 @@ mod tree_viewer;
|
||||
pub use tree_viewer::TreeViewer;
|
||||
|
||||
mod finalized_block;
|
||||
pub use finalized_block::{FinalizedBlockReader, FinalizedBlockWriter};
|
||||
pub use finalized_block::{ChainStateBlockReader, ChainStateBlockWriter};
|
||||
|
||||
Reference in New Issue
Block a user