mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
fix: gracefully handle missing persisted_trie_updates (#13942)
This commit is contained in:
@ -7,7 +7,7 @@ use crate::{
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
use futures::{Stream, StreamExt};
|
||||
use reth_chain_state::ExecutedBlock;
|
||||
use reth_chain_state::ExecutedBlockWithTrieUpdates;
|
||||
use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineTypes};
|
||||
use reth_primitives::{NodePrimitives, RecoveredBlock};
|
||||
use reth_primitives_traits::Block;
|
||||
@ -245,7 +245,7 @@ pub enum EngineApiRequest<T: EngineTypes, N: NodePrimitives> {
|
||||
/// A request received from the consensus engine.
|
||||
Beacon(BeaconEngineMessage<T>),
|
||||
/// Request to insert an already executed block, e.g. via payload building.
|
||||
InsertExecutedBlock(ExecutedBlock<N>),
|
||||
InsertExecutedBlock(ExecutedBlockWithTrieUpdates<N>),
|
||||
}
|
||||
|
||||
impl<T: EngineTypes, N: NodePrimitives> Display for EngineApiRequest<T, N> {
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
use crate::metrics::PersistenceMetrics;
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_eips::BlockNumHash;
|
||||
use reth_chain_state::ExecutedBlock;
|
||||
use reth_chain_state::ExecutedBlockWithTrieUpdates;
|
||||
use reth_errors::ProviderError;
|
||||
use reth_primitives::{EthPrimitives, NodePrimitives};
|
||||
use reth_provider::{
|
||||
@ -140,7 +140,7 @@ where
|
||||
|
||||
fn on_save_blocks(
|
||||
&self,
|
||||
blocks: Vec<ExecutedBlock<N::Primitives>>,
|
||||
blocks: Vec<ExecutedBlockWithTrieUpdates<N::Primitives>>,
|
||||
) -> Result<Option<BlockNumHash>, PersistenceError> {
|
||||
debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.recovered_block.num_hash()), last=?blocks.last().map(|b| b.recovered_block.num_hash()), "Saving range of blocks");
|
||||
let start_time = Instant::now();
|
||||
@ -181,7 +181,7 @@ pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
|
||||
///
|
||||
/// First, header, transaction, and receipt-related data should be written to static files.
|
||||
/// Then the execution history-related data will be written to the database.
|
||||
SaveBlocks(Vec<ExecutedBlock<N>>, oneshot::Sender<Option<BlockNumHash>>),
|
||||
SaveBlocks(Vec<ExecutedBlockWithTrieUpdates<N>>, oneshot::Sender<Option<BlockNumHash>>),
|
||||
|
||||
/// Removes block data above the given block number from the database.
|
||||
///
|
||||
@ -258,7 +258,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
/// If there are no blocks to persist, then `None` is sent in the sender.
|
||||
pub fn save_blocks(
|
||||
&self,
|
||||
blocks: Vec<ExecutedBlock<T>>,
|
||||
blocks: Vec<ExecutedBlockWithTrieUpdates<T>>,
|
||||
tx: oneshot::Sender<Option<BlockNumHash>>,
|
||||
) -> Result<(), SendError<PersistenceAction<T>>> {
|
||||
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
|
||||
|
||||
@ -21,7 +21,8 @@ use alloy_rpc_types_engine::{
|
||||
use block_buffer::BlockBuffer;
|
||||
use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
|
||||
use reth_chain_state::{
|
||||
CanonicalInMemoryState, ExecutedBlock, MemoryOverlayStateProvider, NewCanonicalChain,
|
||||
CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates,
|
||||
MemoryOverlayStateProvider, NewCanonicalChain,
|
||||
};
|
||||
use reth_consensus::{Consensus, FullConsensus, PostExecutionInput};
|
||||
pub use reth_engine_primitives::InvalidBlockHook;
|
||||
@ -102,13 +103,13 @@ pub struct TreeState<N: NodePrimitives = EthPrimitives> {
|
||||
/// __All__ unique executed blocks by block hash that are connected to the canonical chain.
|
||||
///
|
||||
/// This includes blocks of all forks.
|
||||
blocks_by_hash: HashMap<B256, ExecutedBlock<N>>,
|
||||
blocks_by_hash: HashMap<B256, ExecutedBlockWithTrieUpdates<N>>,
|
||||
/// Executed blocks grouped by their respective block number.
|
||||
///
|
||||
/// This maps unique block number to all known blocks for that height.
|
||||
///
|
||||
/// Note: there can be multiple blocks at the same height due to forks.
|
||||
blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlock<N>>>,
|
||||
blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlockWithTrieUpdates<N>>>,
|
||||
/// Map of any parent block hash to its children.
|
||||
parent_to_child: HashMap<B256, HashSet<B256>>,
|
||||
/// Map of hash to trie updates for canonical blocks that are persisted but not finalized.
|
||||
@ -136,8 +137,8 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
self.blocks_by_hash.len()
|
||||
}
|
||||
|
||||
/// Returns the [`ExecutedBlock`] by hash.
|
||||
fn executed_block_by_hash(&self, hash: B256) -> Option<&ExecutedBlock<N>> {
|
||||
/// Returns the [`ExecutedBlockWithTrieUpdates`] by hash.
|
||||
fn executed_block_by_hash(&self, hash: B256) -> Option<&ExecutedBlockWithTrieUpdates<N>> {
|
||||
self.blocks_by_hash.get(&hash)
|
||||
}
|
||||
|
||||
@ -150,7 +151,7 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
/// newest to oldest. And the parent hash of the oldest block that is missing from the buffer.
|
||||
///
|
||||
/// Returns `None` if the block for the given hash is not found.
|
||||
fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlock<N>>)> {
|
||||
fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlockWithTrieUpdates<N>>)> {
|
||||
let block = self.blocks_by_hash.get(&hash).cloned()?;
|
||||
let mut parent_hash = block.recovered_block().parent_hash();
|
||||
let mut blocks = vec![block];
|
||||
@ -163,7 +164,7 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
}
|
||||
|
||||
/// Insert executed block into the state.
|
||||
fn insert_executed(&mut self, executed: ExecutedBlock<N>) {
|
||||
fn insert_executed(&mut self, executed: ExecutedBlockWithTrieUpdates<N>) {
|
||||
let hash = executed.recovered_block().hash();
|
||||
let parent_hash = executed.recovered_block().parent_hash();
|
||||
let block_number = executed.recovered_block().number();
|
||||
@ -194,7 +195,10 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
/// ## Returns
|
||||
///
|
||||
/// The removed block and the block hashes of its children.
|
||||
fn remove_by_hash(&mut self, hash: B256) -> Option<(ExecutedBlock<N>, HashSet<B256>)> {
|
||||
fn remove_by_hash(
|
||||
&mut self,
|
||||
hash: B256,
|
||||
) -> Option<(ExecutedBlockWithTrieUpdates<N>, HashSet<B256>)> {
|
||||
let executed = self.blocks_by_hash.remove(&hash)?;
|
||||
|
||||
// Remove this block from collection of children of its parent block.
|
||||
@ -903,7 +907,8 @@ where
|
||||
// This is only done for in-memory blocks, because we should not have persisted any blocks
|
||||
// that are _above_ the current canonical head.
|
||||
while current_number > current_canonical_number {
|
||||
if let Some(block) = self.executed_block_by_hash(current_hash)? {
|
||||
if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
|
||||
{
|
||||
current_hash = block.recovered_block().parent_hash();
|
||||
current_number -= 1;
|
||||
new_chain.push(block);
|
||||
@ -931,7 +936,7 @@ where
|
||||
// If the canonical chain is ahead of the new chain,
|
||||
// gather all blocks until new head number.
|
||||
while current_canonical_number > current_number {
|
||||
if let Some(block) = self.executed_block_by_hash(old_hash)? {
|
||||
if let Some(block) = self.canonical_block_by_hash(old_hash)? {
|
||||
old_chain.push(block.clone());
|
||||
old_hash = block.recovered_block().parent_hash();
|
||||
current_canonical_number -= 1;
|
||||
@ -948,7 +953,7 @@ where
|
||||
// Walk both chains from specified hashes at same height until
|
||||
// a common ancestor (fork block) is reached.
|
||||
while old_hash != current_hash {
|
||||
if let Some(block) = self.executed_block_by_hash(old_hash)? {
|
||||
if let Some(block) = self.canonical_block_by_hash(old_hash)? {
|
||||
old_hash = block.recovered_block().parent_hash();
|
||||
old_chain.push(block);
|
||||
} else {
|
||||
@ -957,7 +962,8 @@ where
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if let Some(block) = self.executed_block_by_hash(current_hash)? {
|
||||
if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
|
||||
{
|
||||
current_hash = block.recovered_block().parent_hash();
|
||||
new_chain.push(block);
|
||||
} else {
|
||||
@ -1524,7 +1530,7 @@ where
|
||||
/// Returns a batch of consecutive canonical blocks to persist in the range
|
||||
/// `(last_persisted_number .. canonical_head - threshold]` . The expected
|
||||
/// order is oldest -> newest.
|
||||
fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlock<N>> {
|
||||
fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlockWithTrieUpdates<N>> {
|
||||
let mut blocks_to_persist = Vec::new();
|
||||
let mut current_hash = self.state.tree_state.canonical_block_hash();
|
||||
let last_persisted_number = self.persistence_state.last_persisted_block.number;
|
||||
@ -1577,19 +1583,13 @@ where
|
||||
/// has in memory.
|
||||
///
|
||||
/// For finalized blocks, this will return `None`.
|
||||
fn executed_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
|
||||
fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
|
||||
trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
|
||||
// check memory first
|
||||
let block = self.state.tree_state.executed_block_by_hash(hash).cloned();
|
||||
|
||||
if block.is_some() {
|
||||
return Ok(block)
|
||||
if let Some(block) = self.state.tree_state.executed_block_by_hash(hash).cloned() {
|
||||
return Ok(Some(block.block))
|
||||
}
|
||||
|
||||
let Some((_, updates)) = self.state.tree_state.persisted_trie_updates.get(&hash) else {
|
||||
return Ok(None)
|
||||
};
|
||||
|
||||
let (block, senders) = self
|
||||
.provider
|
||||
.sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
|
||||
@ -1603,7 +1603,6 @@ where
|
||||
|
||||
Ok(Some(ExecutedBlock {
|
||||
recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
|
||||
trie: updates.clone(),
|
||||
execution_output: Arc::new(execution_output),
|
||||
hashed_state: Arc::new(hashed_state),
|
||||
}))
|
||||
@ -2042,7 +2041,21 @@ where
|
||||
|
||||
self.update_reorg_metrics(old.len());
|
||||
self.reinsert_reorged_blocks(new.clone());
|
||||
self.reinsert_reorged_blocks(old.clone());
|
||||
// Try reinserting the reorged canonical chain. This is only possible if we have
|
||||
// `persisted_trie_updatess` for those blocks.
|
||||
let old = old
|
||||
.iter()
|
||||
.filter_map(|block| {
|
||||
let (_, trie) = self
|
||||
.state
|
||||
.tree_state
|
||||
.persisted_trie_updates
|
||||
.get(&block.recovered_block.hash())
|
||||
.cloned()?;
|
||||
Some(ExecutedBlockWithTrieUpdates { block: block.clone(), trie })
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
self.reinsert_reorged_blocks(old);
|
||||
}
|
||||
|
||||
// update the tracked in-memory state with the new chain
|
||||
@ -2069,7 +2082,7 @@ where
|
||||
}
|
||||
|
||||
/// This reinserts any blocks in the new chain that do not already exist in the tree
|
||||
fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
|
||||
fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
|
||||
for block in new_chain {
|
||||
if self
|
||||
.state
|
||||
@ -2369,10 +2382,15 @@ where
|
||||
self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
|
||||
debug!(target: "engine::tree", ?root_elapsed, block=?block_num_hash, "Calculated state root");
|
||||
|
||||
let executed: ExecutedBlock<N> = ExecutedBlock {
|
||||
recovered_block: Arc::new(block),
|
||||
execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))),
|
||||
hashed_state: Arc::new(hashed_state),
|
||||
let executed: ExecutedBlockWithTrieUpdates<N> = ExecutedBlockWithTrieUpdates {
|
||||
block: ExecutedBlock {
|
||||
recovered_block: Arc::new(RecoveredBlock::new_sealed(
|
||||
sealed_block.as_ref().clone(),
|
||||
block.senders().to_vec(),
|
||||
)),
|
||||
execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))),
|
||||
hashed_state: Arc::new(hashed_state),
|
||||
},
|
||||
trie: Arc::new(trie_output),
|
||||
};
|
||||
|
||||
@ -2878,7 +2896,7 @@ mod tests {
|
||||
>,
|
||||
to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
|
||||
from_tree_rx: UnboundedReceiver<EngineApiEvent>,
|
||||
blocks: Vec<ExecutedBlock>,
|
||||
blocks: Vec<ExecutedBlockWithTrieUpdates>,
|
||||
action_rx: Receiver<PersistenceAction>,
|
||||
executor_provider: MockExecutorProvider,
|
||||
block_builder: TestBlockBuilder,
|
||||
@ -2949,7 +2967,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn with_blocks(mut self, blocks: Vec<ExecutedBlock>) -> Self {
|
||||
fn with_blocks(mut self, blocks: Vec<ExecutedBlockWithTrieUpdates>) -> Self {
|
||||
let mut blocks_by_hash = HashMap::default();
|
||||
let mut blocks_by_number = BTreeMap::new();
|
||||
let mut state_by_hash = HashMap::default();
|
||||
@ -3694,20 +3712,24 @@ mod tests {
|
||||
let chain_b = test_block_builder.create_fork(&last_block, 10);
|
||||
|
||||
for block in &chain_a {
|
||||
test_harness.tree.state.tree_state.insert_executed(ExecutedBlock {
|
||||
recovered_block: Arc::new(block.clone()),
|
||||
execution_output: Arc::new(ExecutionOutcome::default()),
|
||||
hashed_state: Arc::new(HashedPostState::default()),
|
||||
test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
|
||||
block: ExecutedBlock {
|
||||
recovered_block: Arc::new(block.clone()),
|
||||
execution_output: Arc::new(ExecutionOutcome::default()),
|
||||
hashed_state: Arc::new(HashedPostState::default()),
|
||||
},
|
||||
trie: Arc::new(TrieUpdates::default()),
|
||||
});
|
||||
}
|
||||
test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash());
|
||||
|
||||
for block in &chain_b {
|
||||
test_harness.tree.state.tree_state.insert_executed(ExecutedBlock {
|
||||
recovered_block: Arc::new(block.clone()),
|
||||
execution_output: Arc::new(ExecutionOutcome::default()),
|
||||
hashed_state: Arc::new(HashedPostState::default()),
|
||||
test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
|
||||
block: ExecutedBlock {
|
||||
recovered_block: Arc::new(block.clone()),
|
||||
execution_output: Arc::new(ExecutionOutcome::default()),
|
||||
hashed_state: Arc::new(HashedPostState::default()),
|
||||
},
|
||||
trie: Arc::new(TrieUpdates::default()),
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user