feat: enable state root task during persistence (#12392)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
Dan Cline
2025-01-29 14:57:32 -05:00
committed by GitHub
parent 974cea1d38
commit 665ca2e89b
7 changed files with 462 additions and 72 deletions

1
Cargo.lock generated
View File

@ -8614,6 +8614,7 @@ dependencies = [
"reth-db-api", "reth-db-api",
"reth-errors", "reth-errors",
"reth-ethereum-engine-primitives", "reth-ethereum-engine-primitives",
"reth-ethereum-primitives",
"reth-evm", "reth-evm",
"reth-execution-types", "reth-execution-types",
"reth-fs-util", "reth-fs-util",

View File

@ -20,6 +20,7 @@ use alloy_rpc_types_engine::{
}; };
use block_buffer::BlockBuffer; use block_buffer::BlockBuffer;
use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError}; use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
use persistence_state::CurrentPersistenceAction;
use reth_chain_state::{ use reth_chain_state::{
CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates, CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates,
MemoryOverlayStateProvider, NewCanonicalChain, MemoryOverlayStateProvider, NewCanonicalChain,
@ -377,6 +378,41 @@ impl<N: NodePrimitives> TreeState<N> {
} }
} }
/// Determines if the second block is a direct descendant of the first block.
///
/// If the two blocks are the same, this returns `false`.
fn is_descendant(&self, first: BlockNumHash, second: &N::BlockHeader) -> bool {
// If the second block's parent is the first block's hash, then it is a direct descendant
// and we can return early.
if second.parent_hash() == first.hash {
return true
}
// If the second block is lower than, or has the same block number, they are not
// descendants.
if second.number() <= first.number {
return false
}
// iterate through parents of the second until we reach the number
let Some(mut current_block) = self.block_by_hash(second.parent_hash()) else {
// If we can't find its parent in the tree, we can't continue, so return false
return false
};
while current_block.number() > first.number + 1 {
let Some(block) = self.block_by_hash(current_block.header().parent_hash()) else {
// If we can't find its parent in the tree, we can't continue, so return false
return false
};
current_block = block;
}
// Now the block numbers should be equal, so we compare hashes.
current_block.parent_hash() == first.hash
}
/// Updates the canonical head to the given block. /// Updates the canonical head to the given block.
fn set_canonical_head(&mut self, new_head: BlockNumHash) { fn set_canonical_head(&mut self, new_head: BlockNumHash) {
self.current_canonical_head = new_head; self.current_canonical_head = new_head;
@ -1006,6 +1042,25 @@ where
Ok(true) Ok(true)
} }
/// Returns whether or not the input block is a descendant of the blocks being persisted.
fn is_descendant_of_persisting_blocks(&self, block: &N::BlockHeader) -> bool {
self.persistence_state.current_action().is_none_or(|action| {
match action {
CurrentPersistenceAction::SavingBlocks { highest } => {
// The block being validated can't be a descendant if its number is lower than
// the highest block being persisted. In that case, it's likely a fork of a
// lower block.
if block.number() <= highest.number {
return false
}
self.state.tree_state.is_descendant(*highest, block)
}
CurrentPersistenceAction::RemovingBlocks { new_tip_num: _ } => false,
}
})
}
/// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
/// to resolve chain forks and ensure that the Execution Layer is working with the latest valid /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
/// chain. /// chain.
@ -1175,13 +1230,47 @@ where
} }
} }
/// Helper method to remove blocks and set the persistence state. This ensures we keep track of
/// the current persistence action while we're removing blocks.
fn remove_blocks(&mut self, new_tip_num: u64) {
debug!(target: "engine::tree", ?new_tip_num, remove_state=?self.persistence_state.remove_above_state, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
if new_tip_num < self.persistence_state.last_persisted_block.number {
debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
let (tx, rx) = oneshot::channel();
let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
self.persistence_state.start_remove(new_tip_num, rx);
}
}
/// Helper method to save blocks and set the persistence state. This ensures we keep track of
/// the current persistence action while we're saving blocks.
fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
if blocks_to_persist.is_empty() {
debug!(target: "engine::tree", "Returned empty set of blocks to persist");
return
}
// NOTE: checked non-empty above
let highest_num_hash = blocks_to_persist
.iter()
.max_by_key(|block| block.recovered_block().number())
.map(|b| b.recovered_block().num_hash())
.expect("Checked non-empty persisting blocks");
debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
let (tx, rx) = oneshot::channel();
let _ = self.persistence.save_blocks(blocks_to_persist, tx);
self.persistence_state.start_save(highest_num_hash, rx);
}
/// Attempts to advance the persistence state. /// Attempts to advance the persistence state.
/// ///
/// If we're currently awaiting a response this will try to receive the response (non-blocking) /// If we're currently awaiting a response this will try to receive the response (non-blocking)
/// or send a new persistence action if necessary. /// or send a new persistence action if necessary.
fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> { fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
if self.persistence_state.in_progress() { if self.persistence_state.in_progress() {
let (mut rx, start_time) = self let (mut rx, start_time, current_action) = self
.persistence_state .persistence_state
.rx .rx
.take() .take()
@ -1207,29 +1296,18 @@ where
self.on_new_persisted_block()?; self.on_new_persisted_block()?;
} }
Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()), Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
Err(TryRecvError::Empty) => self.persistence_state.rx = Some((rx, start_time)), Err(TryRecvError::Empty) => {
self.persistence_state.rx = Some((rx, start_time, current_action))
}
} }
} }
if !self.persistence_state.in_progress() { if !self.persistence_state.in_progress() {
if let Some(new_tip_num) = self.persistence_state.remove_above_state.pop_front() { if let Some(new_tip_num) = self.persistence_state.remove_above_state.pop_front() {
debug!(target: "engine::tree", ?new_tip_num, remove_state=?self.persistence_state.remove_above_state, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task"); self.remove_blocks(new_tip_num)
if new_tip_num < self.persistence_state.last_persisted_block.number {
debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
let (tx, rx) = oneshot::channel();
let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
self.persistence_state.start(rx);
}
} else if self.should_persist() { } else if self.should_persist() {
let blocks_to_persist = self.get_canonical_blocks_to_persist(); let blocks_to_persist = self.get_canonical_blocks_to_persist();
if blocks_to_persist.is_empty() { self.persist_blocks(blocks_to_persist);
debug!(target: "engine::tree", "Returned empty set of blocks to persist");
} else {
debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
let (tx, rx) = oneshot::channel();
let _ = self.persistence.save_blocks(blocks_to_persist, tx);
self.persistence_state.start(rx);
}
} }
} }
@ -1510,7 +1588,7 @@ where
/// Returns true if the canonical chain length minus the last persisted /// Returns true if the canonical chain length minus the last persisted
/// block is greater than or equal to the persistence threshold and /// block is greater than or equal to the persistence threshold and
/// backfill is not running. /// backfill is not running.
const fn should_persist(&self) -> bool { pub const fn should_persist(&self) -> bool {
if !self.backfill_sync_state.is_idle() { if !self.backfill_sync_state.is_idle() {
// can't persist if backfill is running // can't persist if backfill is running
return false return false
@ -2268,29 +2346,40 @@ where
let state_provider = let state_provider =
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics); CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
let sealed_block = Arc::new(block.clone_sealed_block());
trace!(target: "engine::tree", block=?block_num_hash, "Executing block"); trace!(target: "engine::tree", block=?block_num_hash, "Executing block");
let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider)); let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
let persistence_not_in_progress = !self.persistence_state.in_progress();
let (state_root_handle, state_root_task_config, state_hook) = if persistence_not_in_progress && let sealed_block = Arc::new(block.clone_sealed_block());
self.config.use_state_root_task()
{
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let state_root_config = StateRootConfig::new_from_input(
consistent_view.clone(),
self.compute_trie_input(consistent_view, block.header().parent_hash())
.map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?,
);
let state_root_task = // We only run the parallel state root if we are currently persisting blocks that are all
StateRootTask::new(state_root_config.clone(), self.state_root_task_pool.clone()); // ancestors of the one we are executing. If we're committing ancestor blocks, then: any
let state_hook = Box::new(state_root_task.state_hook()) as Box<dyn OnStateHook>; // trie updates being committed are a subset of the in-memory trie updates collected before
(Some(state_root_task.spawn()), Some(state_root_config), state_hook) // fetching reverts. So any diff in reverts (pre vs post commit) is already covered by the
} else { // in-memory trie updates we collect in `compute_state_root_parallel`.
(None, None, Box::new(NoopHook::default()) as Box<dyn OnStateHook>) //
}; // See https://github.com/paradigmxyz/reth/issues/12688 for more details
let is_descendant_of_persisting_blocks =
self.is_descendant_of_persisting_blocks(block.header());
let (state_root_handle, state_root_task_config, state_hook) =
if is_descendant_of_persisting_blocks && self.config.use_state_root_task() {
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let state_root_config = StateRootConfig::new_from_input(
consistent_view.clone(),
self.compute_trie_input(consistent_view, block.header().parent_hash())
.map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?,
);
let state_root_task = StateRootTask::new(
state_root_config.clone(),
self.state_root_task_pool.clone(),
);
let state_hook = Box::new(state_root_task.state_hook()) as Box<dyn OnStateHook>;
(Some(state_root_task.spawn()), Some(state_root_config), state_hook)
} else {
(None, None, Box::new(NoopHook::default()) as Box<dyn OnStateHook>)
};
let execution_start = Instant::now(); let execution_start = Instant::now();
let output = self.metrics.executor.execute_metered(executor, &block, state_hook)?; let output = self.metrics.executor.execute_metered(executor, &block, state_hook)?;
@ -2317,7 +2406,7 @@ where
// is being persisted as we are computing in parallel, because we initialize // is being persisted as we are computing in parallel, because we initialize
// a different database transaction per thread and it might end up with a // a different database transaction per thread and it might end up with a
// different view of the database. // different view of the database.
let (state_root, trie_output, root_elapsed) = if persistence_not_in_progress { let (state_root, trie_output, root_elapsed) = if is_descendant_of_persisting_blocks {
if self.config.use_state_root_task() { if self.config.use_state_root_task() {
let state_root_handle = state_root_handle let state_root_handle = state_root_handle
.expect("state root handle must exist if use_state_root_task is true"); .expect("state root handle must exist if use_state_root_task is true");
@ -2354,7 +2443,7 @@ where
} }
} }
} else { } else {
debug!(target: "engine::tree", block=?block_num_hash, ?persistence_not_in_progress, "Failed to compute state root in parallel"); debug!(target: "engine::tree", block=?block_num_hash, ?is_descendant_of_persisting_blocks, "Failed to compute state root in parallel");
let (root, updates) = state_provider.state_root_with_updates(hashed_state.clone())?; let (root, updates) = state_provider.state_root_with_updates(hashed_state.clone())?;
(root, updates, root_time.elapsed()) (root, updates, root_time.elapsed())
}; };
@ -3413,6 +3502,29 @@ mod tests {
assert!(resp.is_syncing()); assert!(resp.is_syncing());
} }
#[test]
fn test_tree_state_normal_descendant() {
let mut tree_state = TreeState::new(BlockNumHash::default());
let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
tree_state.insert_executed(blocks[0].clone());
assert!(tree_state.is_descendant(
blocks[0].recovered_block().num_hash(),
blocks[1].recovered_block().header()
));
tree_state.insert_executed(blocks[1].clone());
assert!(tree_state.is_descendant(
blocks[0].recovered_block().num_hash(),
blocks[2].recovered_block().header()
));
assert!(tree_state.is_descendant(
blocks[1].recovered_block().num_hash(),
blocks[2].recovered_block().header()
));
}
#[tokio::test] #[tokio::test]
async fn test_tree_state_insert_executed() { async fn test_tree_state_insert_executed() {
let mut tree_state = TreeState::new(BlockNumHash::default()); let mut tree_state = TreeState::new(BlockNumHash::default());
@ -3629,11 +3741,18 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn test_tree_state_on_new_head() { async fn test_tree_state_on_new_head_reorg() {
reth_tracing::init_test_tracing();
let chain_spec = MAINNET.clone(); let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec);
let mut test_block_builder = TestBlockBuilder::eth();
// Set persistence_threshold to 1
let mut test_harness = TestHarness::new(chain_spec);
test_harness.tree.config = test_harness
.tree
.config
.with_persistence_threshold(1)
.with_memory_block_buffer_target(1);
let mut test_block_builder = TestBlockBuilder::eth();
let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect(); let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
for block in &blocks { for block in &blocks {
@ -3668,9 +3787,51 @@ mod tests {
assert_eq!(new[1].recovered_block().hash(), blocks[4].recovered_block().hash()); assert_eq!(new[1].recovered_block().hash(), blocks[4].recovered_block().hash());
} }
// should be a None persistence action before we advance persistence
let current_action = test_harness.tree.persistence_state.current_action();
assert_eq!(current_action, None);
// let's attempt to persist and check that it attempts to save blocks
//
// since in-memory block buffer target and persistence_threshold are both 1, this should
// save all but the current tip of the canonical chain (up to blocks[1])
test_harness.tree.advance_persistence().unwrap();
let current_action = test_harness.tree.persistence_state.current_action().cloned();
assert_eq!(
current_action,
Some(CurrentPersistenceAction::SavingBlocks {
highest: blocks[1].recovered_block().num_hash()
})
);
// get rid of the prev action
let received_action = test_harness.action_rx.recv().unwrap();
let PersistenceAction::SaveBlocks(saved_blocks, sender) = received_action else {
panic!("received wrong action");
};
assert_eq!(saved_blocks, vec![blocks[0].clone(), blocks[1].clone()]);
// send the response so we can advance again
sender.send(Some(blocks[1].recovered_block().num_hash())).unwrap();
// we should be persisting blocks[1] because we threw out the prev action
let current_action = test_harness.tree.persistence_state.current_action().cloned();
assert_eq!(
current_action,
Some(CurrentPersistenceAction::SavingBlocks {
highest: blocks[1].recovered_block().num_hash()
})
);
// after advancing persistence, we should be at `None` for the next action
test_harness.tree.advance_persistence().unwrap();
let current_action = test_harness.tree.persistence_state.current_action().cloned();
assert_eq!(current_action, None);
// reorg case // reorg case
let result = test_harness.tree.on_new_head(fork_block_5.recovered_block().hash()).unwrap(); let result = test_harness.tree.on_new_head(fork_block_5.recovered_block().hash()).unwrap();
assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. }))); assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. })));
if let Some(NewCanonicalChain::Reorg { new, old }) = result { if let Some(NewCanonicalChain::Reorg { new, old }) = result {
assert_eq!(new.len(), 3); assert_eq!(new.len(), 3);
assert_eq!(new[0].recovered_block().hash(), fork_block_3.recovered_block().hash()); assert_eq!(new[0].recovered_block().hash(), fork_block_3.recovered_block().hash());
@ -3680,6 +3841,29 @@ mod tests {
assert_eq!(old.len(), 1); assert_eq!(old.len(), 1);
assert_eq!(old[0].recovered_block().hash(), blocks[2].recovered_block().hash()); assert_eq!(old[0].recovered_block().hash(), blocks[2].recovered_block().hash());
} }
// The canonical block has not changed, so we will not get any active persistence action
test_harness.tree.advance_persistence().unwrap();
let current_action = test_harness.tree.persistence_state.current_action().cloned();
assert_eq!(current_action, None);
// Let's change the canonical head and advance persistence
test_harness
.tree
.state
.tree_state
.set_canonical_head(fork_block_5.recovered_block().num_hash());
// The canonical block has changed now, we should get fork_block_4 due to the persistence
// threshold and in memory block buffer target
test_harness.tree.advance_persistence().unwrap();
let current_action = test_harness.tree.persistence_state.current_action().cloned();
assert_eq!(
current_action,
Some(CurrentPersistenceAction::SavingBlocks {
highest: fork_block_4.recovered_block().num_hash()
})
);
} }
#[tokio::test] #[tokio::test]
@ -3804,6 +3988,15 @@ mod tests {
// check that the original block 4 is still included // check that the original block 4 is still included
assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 && assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 &&
b.recovered_block().hash() == blocks[4].recovered_block().hash())); b.recovered_block().hash() == blocks[4].recovered_block().hash()));
// check that if we advance persistence, the persistence action is the correct value
test_harness.tree.advance_persistence().expect("advancing persistence should succeed");
assert_eq!(
test_harness.tree.persistence_state.current_action().cloned(),
Some(CurrentPersistenceAction::SavingBlocks {
highest: blocks_to_persist.last().unwrap().recovered_block().num_hash()
})
);
} }
#[tokio::test] #[tokio::test]

View File

@ -13,7 +13,8 @@ pub struct PersistenceState {
pub(crate) last_persisted_block: BlockNumHash, pub(crate) last_persisted_block: BlockNumHash,
/// Receiver end of channel where the result of the persistence task will be /// Receiver end of channel where the result of the persistence task will be
/// sent when done. A None value means there's no persistence task in progress. /// sent when done. A None value means there's no persistence task in progress.
pub(crate) rx: Option<(oneshot::Receiver<Option<BlockNumHash>>, Instant)>, pub(crate) rx:
Option<(oneshot::Receiver<Option<BlockNumHash>>, Instant, CurrentPersistenceAction)>,
/// The block above which blocks should be removed from disk, because there has been an on disk /// The block above which blocks should be removed from disk, because there has been an on disk
/// reorg. /// reorg.
pub(crate) remove_above_state: VecDeque<u64>, pub(crate) remove_above_state: VecDeque<u64>,
@ -26,9 +27,29 @@ impl PersistenceState {
self.rx.is_some() self.rx.is_some()
} }
/// Sets state for a started persistence task. /// Sets the state for a block removal operation.
pub(crate) fn start(&mut self, rx: oneshot::Receiver<Option<BlockNumHash>>) { pub(crate) fn start_remove(
self.rx = Some((rx, Instant::now())); &mut self,
new_tip_num: u64,
rx: oneshot::Receiver<Option<BlockNumHash>>,
) {
self.rx =
Some((rx, Instant::now(), CurrentPersistenceAction::RemovingBlocks { new_tip_num }));
}
/// Sets the state for a block save operation.
pub(crate) fn start_save(
&mut self,
highest: BlockNumHash,
rx: oneshot::Receiver<Option<BlockNumHash>>,
) {
self.rx = Some((rx, Instant::now(), CurrentPersistenceAction::SavingBlocks { highest }));
}
/// Returns the current persistence action. If there is no persistence task in progress, then
/// this returns `None`.
pub(crate) fn current_action(&self) -> Option<&CurrentPersistenceAction> {
self.rx.as_ref().map(|rx| &rx.2)
} }
/// Sets the `remove_above_state`, to the new tip number specified, only if it is less than the /// Sets the `remove_above_state`, to the new tip number specified, only if it is less than the
@ -50,3 +71,18 @@ impl PersistenceState {
BlockNumHash::new(last_persisted_block_number, last_persisted_block_hash); BlockNumHash::new(last_persisted_block_number, last_persisted_block_hash);
} }
} }
/// The currently running persistence action.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum CurrentPersistenceAction {
/// The persistence task is saving blocks.
SavingBlocks {
/// The highest block being saved.
highest: BlockNumHash,
},
/// The persistence task is removing blocks.
RemovingBlocks {
/// The tip, above which we are removing blocks.
new_tip_num: u64,
},
}

View File

@ -86,7 +86,6 @@ where
+ HeaderProvider + HeaderProvider
+ HashingWriter + HashingWriter
+ StateWriter + StateWriter
+ StateWriter
+ AsRef<PF::ProviderRW>, + AsRef<PF::ProviderRW>,
PF::ChainSpec: EthChainSpec<Header = <PF::Primitives as NodePrimitives>::BlockHeader>, PF::ChainSpec: EthChainSpec<Header = <PF::Primitives as NodePrimitives>::BlockHeader>,
{ {

View File

@ -184,6 +184,12 @@ pub enum ConsistentViewError {
/// The tip diff. /// The tip diff.
tip: GotExpected<Option<B256>>, tip: GotExpected<Option<B256>>,
}, },
/// Error thrown when the database does not contain a block from the previous database view.
#[display("database view no longer contains block: {block:?}")]
Reorged {
/// The previous block
block: B256,
},
} }
impl From<ConsistentViewError> for ProviderError { impl From<ConsistentViewError> for ProviderError {

View File

@ -73,6 +73,7 @@ reth-chain-state = { workspace = true, features = ["test-utils"] }
reth-trie = { workspace = true, features = ["test-utils"] } reth-trie = { workspace = true, features = ["test-utils"] }
reth-testing-utils.workspace = true reth-testing-utils.workspace = true
reth-ethereum-engine-primitives.workspace = true reth-ethereum-engine-primitives.workspace = true
reth-ethereum-primitives.workspace = true
parking_lot.workspace = true parking_lot.workspace = true
tempfile.workspace = true tempfile.workspace = true
@ -105,6 +106,7 @@ test-utils = [
"reth-trie/test-utils", "reth-trie/test-utils",
"reth-chain-state/test-utils", "reth-chain-state/test-utils",
"reth-ethereum-engine-primitives", "reth-ethereum-engine-primitives",
"reth-ethereum-primitives/test-utils",
"reth-chainspec/test-utils", "reth-chainspec/test-utils",
"reth-evm/test-utils", "reth-evm/test-utils",
"reth-network-p2p/test-utils", "reth-network-p2p/test-utils",

View File

@ -1,7 +1,6 @@
use crate::{BlockNumReader, DatabaseProviderFactory, HeaderProvider}; use crate::{BlockNumReader, DatabaseProviderFactory, HeaderProvider};
use alloy_primitives::B256; use alloy_primitives::B256;
use reth_errors::ProviderError; use reth_errors::ProviderError;
use reth_primitives::GotExpected;
use reth_storage_api::{DBProvider, StateCommitmentProvider}; use reth_storage_api::{DBProvider, StateCommitmentProvider};
use reth_storage_errors::provider::ProviderResult; use reth_storage_errors::provider::ProviderResult;
@ -28,7 +27,7 @@ pub use reth_storage_errors::provider::ConsistentViewError;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ConsistentDbView<Factory> { pub struct ConsistentDbView<Factory> {
factory: Factory, factory: Factory,
tip: Option<B256>, tip: Option<(B256, u64)>,
} }
impl<Factory> ConsistentDbView<Factory> impl<Factory> ConsistentDbView<Factory>
@ -37,7 +36,7 @@ where
+ StateCommitmentProvider, + StateCommitmentProvider,
{ {
/// Creates new consistent database view. /// Creates new consistent database view.
pub const fn new(factory: Factory, tip: Option<B256>) -> Self { pub const fn new(factory: Factory, tip: Option<(B256, u64)>) -> Self {
Self { factory, tip } Self { factory, tip }
} }
@ -45,7 +44,7 @@ where
pub fn new_with_latest_tip(provider: Factory) -> ProviderResult<Self> { pub fn new_with_latest_tip(provider: Factory) -> ProviderResult<Self> {
let provider_ro = provider.database_provider_ro()?; let provider_ro = provider.database_provider_ro()?;
let last_num = provider_ro.last_block_number()?; let last_num = provider_ro.last_block_number()?;
let tip = provider_ro.sealed_header(last_num)?.map(|h| h.hash()); let tip = provider_ro.sealed_header(last_num)?.map(|h| (h.hash(), last_num));
Ok(Self::new(provider, tip)) Ok(Self::new(provider, tip))
} }
@ -71,31 +70,185 @@ where
// Create a new provider. // Create a new provider.
let provider_ro = self.factory.database_provider_ro()?; let provider_ro = self.factory.database_provider_ro()?;
// Check that the latest stored header number matches the number // Check that the currently stored tip is included on-disk.
// that consistent view was initialized with. // This means that the database may have moved, but the view was not reorged.
// The mismatch can happen if a new block was appended while //
// the view was being used. // NOTE: We must use `sealed_header` with the block number here, because if we are using
// We compare block hashes instead of block numbers to account for reorgs. // the consistent view provider while we're persisting blocks, we may enter a race
let last_num = provider_ro.last_block_number()?; // condition. Recall that we always commit to static files first, then the database, and
let tip = provider_ro.sealed_header(last_num)?.map(|h| h.hash()); // that block hash to block number indexes are contained in the database. If we were to
if self.tip != tip { // fetch the block by hash while we're persisting, the following situation may occur:
return Err(ConsistentViewError::Inconsistent { //
tip: GotExpected { got: tip, expected: self.tip }, // 1. Persistence appends the latest block to static files.
// 2. We initialize the consistent view provider, which fetches based on `last_block_number`
// and `sealed_header`, which both check static files, setting the tip to the newly
// committed block.
// 3. We attempt to fetch a header by hash, using for example the `header` method. This
// checks the database first, to fetch the number corresponding to the hash. Because the
// database has not been committed yet, this fails, and we return
// `ConsistentViewError::Reorged`.
// 4. Some time later, the database commits.
//
// To ensure this doesn't happen, we just have to make sure that we fetch from the same
// data source that we used during initialization. In this case, that is static files
if let Some((hash, number)) = self.tip {
if provider_ro.sealed_header(number)?.is_none_or(|header| header.hash() != hash) {
return Err(ConsistentViewError::Reorged { block: hash }.into())
} }
.into())
}
// Check that the best block number is the same as the latest stored header.
// This ensures that the consistent view cannot be used for initializing new providers
// if the node fell back to the staged sync.
let best_block_number = provider_ro.best_block_number()?;
if last_num != best_block_number {
return Err(ConsistentViewError::Syncing {
best_block: GotExpected { got: best_block_number, expected: last_num },
}
.into())
} }
Ok(provider_ro) Ok(provider_ro)
} }
} }
#[cfg(test)]
mod tests {
use std::str::FromStr;
use super::*;
use crate::{
test_utils::create_test_provider_factory_with_chain_spec, BlockWriter,
StaticFileProviderFactory, StaticFileWriter,
};
use alloy_primitives::Bytes;
use assert_matches::assert_matches;
use reth_chainspec::{EthChainSpec, MAINNET};
use reth_ethereum_primitives::{Block, BlockBody};
use reth_primitives::StaticFileSegment;
use reth_primitives_traits::{block::TestBlock, RecoveredBlock, SealedBlock};
use reth_storage_api::StorageLocation;
#[test]
fn test_consistent_view_extend() {
let provider_factory = create_test_provider_factory_with_chain_spec(MAINNET.clone());
let genesis_header = MAINNET.genesis_header();
let genesis_block =
SealedBlock::<Block>::seal_parts(genesis_header.clone(), BlockBody::default());
let genesis_hash: B256 = genesis_block.hash();
let genesis_block = RecoveredBlock::new_sealed(genesis_block, vec![]);
// insert the block
let provider_rw = provider_factory.provider_rw().unwrap();
provider_rw.insert_block(genesis_block, StorageLocation::StaticFiles).unwrap();
provider_rw.commit().unwrap();
// create a consistent view provider and check that a ro provider can be made
let view = ConsistentDbView::new_with_latest_tip(provider_factory.clone()).unwrap();
// ensure successful creation of a read-only provider.
assert_matches!(view.provider_ro(), Ok(_));
// generate a block that extends the genesis
let mut block = Block::default();
block.header_mut().parent_hash = genesis_hash;
block.header_mut().number = 1;
let sealed_block = SealedBlock::seal_slow(block);
let recovered_block = RecoveredBlock::new_sealed(sealed_block, vec![]);
// insert the block
let provider_rw = provider_factory.provider_rw().unwrap();
provider_rw.insert_block(recovered_block, StorageLocation::StaticFiles).unwrap();
provider_rw.commit().unwrap();
// ensure successful creation of a read-only provider, based on this new db state.
assert_matches!(view.provider_ro(), Ok(_));
// generate a block that extends that block
let mut block = Block::default();
block.header_mut().parent_hash = genesis_hash;
block.header_mut().number = 2;
let sealed_block = SealedBlock::seal_slow(block);
let recovered_block = RecoveredBlock::new_sealed(sealed_block, vec![]);
// insert the block
let provider_rw = provider_factory.provider_rw().unwrap();
provider_rw.insert_block(recovered_block, StorageLocation::StaticFiles).unwrap();
provider_rw.commit().unwrap();
// check that creation of a read-only provider still works
assert_matches!(view.provider_ro(), Ok(_));
}
#[test]
fn test_consistent_view_remove() {
let provider_factory = create_test_provider_factory_with_chain_spec(MAINNET.clone());
let genesis_header = MAINNET.genesis_header();
let genesis_block =
SealedBlock::<Block>::seal_parts(genesis_header.clone(), BlockBody::default());
let genesis_hash: B256 = genesis_block.hash();
let genesis_block = RecoveredBlock::new_sealed(genesis_block, vec![]);
// insert the block
let provider_rw = provider_factory.provider_rw().unwrap();
provider_rw.insert_block(genesis_block, StorageLocation::Both).unwrap();
provider_rw.0.static_file_provider().commit().unwrap();
provider_rw.commit().unwrap();
// create a consistent view provider and check that a ro provider can be made
let view = ConsistentDbView::new_with_latest_tip(provider_factory.clone()).unwrap();
// ensure successful creation of a read-only provider.
assert_matches!(view.provider_ro(), Ok(_));
// generate a block that extends the genesis
let mut block = Block::default();
block.header_mut().parent_hash = genesis_hash;
block.header_mut().number = 1;
let sealed_block = SealedBlock::seal_slow(block);
let recovered_block = RecoveredBlock::new_sealed(sealed_block.clone(), vec![]);
// insert the block
let provider_rw = provider_factory.provider_rw().unwrap();
provider_rw.insert_block(recovered_block, StorageLocation::Both).unwrap();
provider_rw.0.static_file_provider().commit().unwrap();
provider_rw.commit().unwrap();
// create a second consistent view provider and check that a ro provider can be made
let view = ConsistentDbView::new_with_latest_tip(provider_factory.clone()).unwrap();
let initial_tip_hash = sealed_block.hash();
// ensure successful creation of a read-only provider, based on this new db state.
assert_matches!(view.provider_ro(), Ok(_));
// remove the block above the genesis block
let provider_rw = provider_factory.provider_rw().unwrap();
provider_rw.remove_blocks_above(0, StorageLocation::Both).unwrap();
let sf_provider = provider_rw.0.static_file_provider();
sf_provider.get_writer(1, StaticFileSegment::Headers).unwrap().prune_headers(1).unwrap();
sf_provider.commit().unwrap();
provider_rw.commit().unwrap();
// ensure unsuccessful creation of a read-only provider, based on this new db state.
let Err(ProviderError::ConsistentView(boxed_consistent_view_err)) = view.provider_ro()
else {
panic!("expected reorged consistent view error, got success");
};
let unboxed = *boxed_consistent_view_err;
assert_eq!(unboxed, ConsistentViewError::Reorged { block: initial_tip_hash });
// generate a block that extends the genesis with a different hash
let mut block = Block::default();
block.header_mut().parent_hash = genesis_hash;
block.header_mut().number = 1;
block.header_mut().extra_data =
Bytes::from_str("6a6f75726e657920746f20697468616361").unwrap();
let sealed_block = SealedBlock::seal_slow(block);
let recovered_block = RecoveredBlock::new_sealed(sealed_block, vec![]);
// reinsert the block at the same height, but with a different hash
let provider_rw = provider_factory.provider_rw().unwrap();
provider_rw.insert_block(recovered_block, StorageLocation::Both).unwrap();
provider_rw.0.static_file_provider().commit().unwrap();
provider_rw.commit().unwrap();
// ensure unsuccessful creation of a read-only provider, based on this new db state.
let Err(ProviderError::ConsistentView(boxed_consistent_view_err)) = view.provider_ro()
else {
panic!("expected reorged consistent view error, got success");
};
let unboxed = *boxed_consistent_view_err;
assert_eq!(unboxed, ConsistentViewError::Reorged { block: initial_tip_hash });
}
}