mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 19:09:54 +00:00
fix(tree): remove_blocks fixes, return hash and number in persistence task (#10678)
This commit is contained in:
@ -288,11 +288,24 @@ impl CanonicalInMemoryState {
|
|||||||
///
|
///
|
||||||
/// This will update the links between blocks and remove all blocks that are [..
|
/// This will update the links between blocks and remove all blocks that are [..
|
||||||
/// `persisted_height`].
|
/// `persisted_height`].
|
||||||
pub fn remove_persisted_blocks(&self, persisted_height: u64) {
|
pub fn remove_persisted_blocks(&self, persisted_num_hash: BlockNumHash) {
|
||||||
|
// if the persisted hash is not in the canonical in memory state, do nothing, because it
|
||||||
|
// means canonical blocks were not actually persisted.
|
||||||
|
//
|
||||||
|
// This can happen if the persistence task takes a long time, while a reorg is happening.
|
||||||
|
{
|
||||||
|
if self.inner.in_memory_state.blocks.read().get(&persisted_num_hash.hash).is_none() {
|
||||||
|
// do nothing
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut blocks = self.inner.in_memory_state.blocks.write();
|
let mut blocks = self.inner.in_memory_state.blocks.write();
|
||||||
let mut numbers = self.inner.in_memory_state.numbers.write();
|
let mut numbers = self.inner.in_memory_state.numbers.write();
|
||||||
|
|
||||||
|
let BlockNumHash { number: persisted_height, hash: _ } = persisted_num_hash;
|
||||||
|
|
||||||
// clear all numbers
|
// clear all numbers
|
||||||
numbers.clear();
|
numbers.clear();
|
||||||
|
|
||||||
|
|||||||
@ -1,10 +1,8 @@
|
|||||||
#![allow(dead_code)]
|
|
||||||
|
|
||||||
use crate::metrics::PersistenceMetrics;
|
use crate::metrics::PersistenceMetrics;
|
||||||
use reth_chain_state::ExecutedBlock;
|
use reth_chain_state::ExecutedBlock;
|
||||||
use reth_errors::ProviderError;
|
use reth_errors::ProviderError;
|
||||||
use reth_node_types::NodeTypesWithDB;
|
use reth_node_types::NodeTypesWithDB;
|
||||||
use reth_primitives::B256;
|
use reth_primitives::BlockNumHash;
|
||||||
use reth_provider::{
|
use reth_provider::{
|
||||||
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory,
|
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory,
|
||||||
StaticFileProviderFactory,
|
StaticFileProviderFactory,
|
||||||
@ -87,7 +85,10 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result<Option<B256>, PersistenceError> {
|
fn on_remove_blocks_above(
|
||||||
|
&self,
|
||||||
|
new_tip_num: u64,
|
||||||
|
) -> Result<Option<BlockNumHash>, PersistenceError> {
|
||||||
debug!(target: "tree::persistence", ?new_tip_num, "Removing blocks");
|
debug!(target: "tree::persistence", ?new_tip_num, "Removing blocks");
|
||||||
let start_time = Instant::now();
|
let start_time = Instant::now();
|
||||||
let provider_rw = self.provider.provider_rw()?;
|
let provider_rw = self.provider.provider_rw()?;
|
||||||
@ -97,16 +98,22 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
|
|||||||
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
|
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
|
||||||
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;
|
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;
|
||||||
|
|
||||||
|
debug!(target: "tree::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
|
||||||
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
|
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
|
||||||
Ok(new_tip_hash)
|
Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_save_blocks(&self, blocks: Vec<ExecutedBlock>) -> Result<Option<B256>, PersistenceError> {
|
fn on_save_blocks(
|
||||||
debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.number), last=?blocks.last().map(|b| b.block.number), "Saving range of blocks");
|
&self,
|
||||||
|
blocks: Vec<ExecutedBlock>,
|
||||||
|
) -> Result<Option<BlockNumHash>, PersistenceError> {
|
||||||
|
debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks");
|
||||||
let start_time = Instant::now();
|
let start_time = Instant::now();
|
||||||
let last_block_hash = blocks.last().map(|block| block.block().hash());
|
let last_block_hash_num = blocks
|
||||||
|
.last()
|
||||||
|
.map(|block| BlockNumHash { hash: block.block().hash(), number: block.block().number });
|
||||||
|
|
||||||
if last_block_hash.is_some() {
|
if last_block_hash_num.is_some() {
|
||||||
let provider_rw = self.provider.provider_rw()?;
|
let provider_rw = self.provider.provider_rw()?;
|
||||||
let static_file_provider = self.provider.static_file_provider();
|
let static_file_provider = self.provider.static_file_provider();
|
||||||
|
|
||||||
@ -114,7 +121,7 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
|
|||||||
UnifiedStorageWriter::commit(provider_rw, static_file_provider)?;
|
UnifiedStorageWriter::commit(provider_rw, static_file_provider)?;
|
||||||
}
|
}
|
||||||
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
|
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
|
||||||
Ok(last_block_hash)
|
Ok(last_block_hash_num)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -138,13 +145,13 @@ pub enum PersistenceAction {
|
|||||||
///
|
///
|
||||||
/// First, header, transaction, and receipt-related data should be written to static files.
|
/// First, header, transaction, and receipt-related data should be written to static files.
|
||||||
/// Then the execution history-related data will be written to the database.
|
/// Then the execution history-related data will be written to the database.
|
||||||
SaveBlocks(Vec<ExecutedBlock>, oneshot::Sender<Option<B256>>),
|
SaveBlocks(Vec<ExecutedBlock>, oneshot::Sender<Option<BlockNumHash>>),
|
||||||
|
|
||||||
/// Removes block data above the given block number from the database.
|
/// Removes block data above the given block number from the database.
|
||||||
///
|
///
|
||||||
/// This will first update checkpoints from the database, then remove actual block data from
|
/// This will first update checkpoints from the database, then remove actual block data from
|
||||||
/// static files.
|
/// static files.
|
||||||
RemoveBlocksAbove(u64, oneshot::Sender<Option<B256>>),
|
RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),
|
||||||
|
|
||||||
/// Prune associated block data before the given block number, according to already-configured
|
/// Prune associated block data before the given block number, according to already-configured
|
||||||
/// prune modes.
|
/// prune modes.
|
||||||
@ -209,7 +216,7 @@ impl PersistenceHandle {
|
|||||||
pub fn save_blocks(
|
pub fn save_blocks(
|
||||||
&self,
|
&self,
|
||||||
blocks: Vec<ExecutedBlock>,
|
blocks: Vec<ExecutedBlock>,
|
||||||
tx: oneshot::Sender<Option<B256>>,
|
tx: oneshot::Sender<Option<BlockNumHash>>,
|
||||||
) -> Result<(), SendError<PersistenceAction>> {
|
) -> Result<(), SendError<PersistenceAction>> {
|
||||||
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
|
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
|
||||||
}
|
}
|
||||||
@ -222,7 +229,7 @@ impl PersistenceHandle {
|
|||||||
pub fn remove_blocks_above(
|
pub fn remove_blocks_above(
|
||||||
&self,
|
&self,
|
||||||
block_num: u64,
|
block_num: u64,
|
||||||
tx: oneshot::Sender<Option<B256>>,
|
tx: oneshot::Sender<Option<BlockNumHash>>,
|
||||||
) -> Result<(), SendError<PersistenceAction>> {
|
) -> Result<(), SendError<PersistenceAction>> {
|
||||||
self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
|
self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
|
||||||
}
|
}
|
||||||
@ -296,7 +303,8 @@ mod tests {
|
|||||||
|
|
||||||
persistence_handle.save_blocks(blocks, tx).unwrap();
|
persistence_handle.save_blocks(blocks, tx).unwrap();
|
||||||
|
|
||||||
let actual_hash = tokio::time::timeout(std::time::Duration::from_secs(10), rx)
|
let BlockNumHash { hash: actual_hash, number: _ } =
|
||||||
|
tokio::time::timeout(std::time::Duration::from_secs(10), rx)
|
||||||
.await
|
.await
|
||||||
.expect("test timed out")
|
.expect("test timed out")
|
||||||
.expect("channel closed unexpectedly")
|
.expect("channel closed unexpectedly")
|
||||||
@ -316,8 +324,7 @@ mod tests {
|
|||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
persistence_handle.save_blocks(blocks, tx).unwrap();
|
persistence_handle.save_blocks(blocks, tx).unwrap();
|
||||||
|
let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
|
||||||
let actual_hash = rx.await.unwrap().unwrap();
|
|
||||||
assert_eq!(last_hash, actual_hash);
|
assert_eq!(last_hash, actual_hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -335,7 +342,7 @@ mod tests {
|
|||||||
|
|
||||||
persistence_handle.save_blocks(blocks, tx).unwrap();
|
persistence_handle.save_blocks(blocks, tx).unwrap();
|
||||||
|
|
||||||
let actual_hash = rx.await.unwrap().unwrap();
|
let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
|
||||||
assert_eq!(last_hash, actual_hash);
|
assert_eq!(last_hash, actual_hash);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -201,6 +201,108 @@ impl TreeState {
|
|||||||
Some((executed, children))
|
Some((executed, children))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns whether or not the hash is part of the canonical chain.
|
||||||
|
pub(crate) fn is_canonical(&self, hash: B256) -> bool {
|
||||||
|
let mut current_block = self.current_canonical_head.hash;
|
||||||
|
if current_block == hash {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
while let Some(executed) = self.blocks_by_hash.get(¤t_block) {
|
||||||
|
current_block = executed.block.parent_hash;
|
||||||
|
if current_block == hash {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes canonical blocks below the upper bound, only if the last persisted hash is
|
||||||
|
/// part of the canonical chain.
|
||||||
|
pub(crate) fn remove_canonical_until(
|
||||||
|
&mut self,
|
||||||
|
upper_bound: BlockNumber,
|
||||||
|
last_persisted_hash: B256,
|
||||||
|
) {
|
||||||
|
debug!(target: "engine", ?upper_bound, ?last_persisted_hash, "Removing canonical blocks from the tree");
|
||||||
|
|
||||||
|
// If the last persisted hash is not canonical, then we don't want to remove any canonical
|
||||||
|
// blocks yet.
|
||||||
|
if !self.is_canonical(last_persisted_hash) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// First, let's walk back the canonical chain and remove canonical blocks lower than the
|
||||||
|
// upper bound
|
||||||
|
let mut current_block = self.current_canonical_head.hash;
|
||||||
|
while let Some(executed) = self.blocks_by_hash.get(¤t_block) {
|
||||||
|
current_block = executed.block.parent_hash;
|
||||||
|
if executed.block.number <= upper_bound {
|
||||||
|
debug!(target: "engine", num_hash=?executed.block.num_hash(), "Attempting to remove block walking back from the head");
|
||||||
|
if let Some((removed, _)) = self.remove_by_hash(executed.block.hash()) {
|
||||||
|
debug!(target: "engine", num_hash=?removed.block.num_hash(), "Removed block walking back from the head");
|
||||||
|
// finally, move the trie updates
|
||||||
|
self.persisted_trie_updates
|
||||||
|
.insert(removed.block.hash(), (removed.block.number, removed.trie));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes all blocks that are below the finalized block, as well as removing non-canonical
|
||||||
|
/// sidechains that fork from below the finalized block.
|
||||||
|
pub(crate) fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) {
|
||||||
|
let BlockNumHash { number: finalized_num, hash: finalized_hash } = finalized_num_hash;
|
||||||
|
|
||||||
|
// We remove disconnected sidechains in three steps:
|
||||||
|
// * first, remove everything with a block number __below__ the finalized block.
|
||||||
|
// * next, we populate a vec with parents __at__ the finalized block.
|
||||||
|
// * finally, we iterate through the vec, removing children until the vec is empty
|
||||||
|
// (BFS).
|
||||||
|
|
||||||
|
// We _exclude_ the finalized block because we will be dealing with the blocks __at__
|
||||||
|
// the finalized block later.
|
||||||
|
let blocks_to_remove = self
|
||||||
|
.blocks_by_number
|
||||||
|
.range((Bound::Unbounded, Bound::Excluded(finalized_num)))
|
||||||
|
.flat_map(|(_, blocks)| blocks.iter().map(|b| b.block.hash()))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
for hash in blocks_to_remove {
|
||||||
|
if let Some((removed, _)) = self.remove_by_hash(hash) {
|
||||||
|
debug!(target: "engine", num_hash=?removed.block.num_hash(), "Removed finalized sidechain block");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove trie updates that are below the finalized block
|
||||||
|
self.persisted_trie_updates.retain(|_, (block_num, _)| *block_num < finalized_num);
|
||||||
|
|
||||||
|
// The only block that should remain at the `finalized` number now, is the finalized
|
||||||
|
// block, if it exists.
|
||||||
|
//
|
||||||
|
// For all other blocks, we first put their children into this vec.
|
||||||
|
// Then, we will iterate over them, removing them, adding their children, etc etc,
|
||||||
|
// until the vec is empty.
|
||||||
|
let mut blocks_to_remove = self.blocks_by_number.remove(&finalized_num).unwrap_or_default();
|
||||||
|
|
||||||
|
// re-insert the finalized hash if we removed it
|
||||||
|
if let Some(position) =
|
||||||
|
blocks_to_remove.iter().position(|b| b.block.hash() == finalized_hash)
|
||||||
|
{
|
||||||
|
let finalized_block = blocks_to_remove.swap_remove(position);
|
||||||
|
self.blocks_by_number.insert(finalized_num, vec![finalized_block]);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut blocks_to_remove =
|
||||||
|
blocks_to_remove.into_iter().map(|e| e.block.hash()).collect::<VecDeque<_>>();
|
||||||
|
while let Some(block) = blocks_to_remove.pop_front() {
|
||||||
|
if let Some((removed, children)) = self.remove_by_hash(block) {
|
||||||
|
debug!(target: "engine", num_hash=?removed.block.num_hash(), "Removed finalized sidechain child block");
|
||||||
|
blocks_to_remove.extend(children);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Remove all blocks up to __and including__ the given block number.
|
/// Remove all blocks up to __and including__ the given block number.
|
||||||
///
|
///
|
||||||
/// If a finalized hash is provided, the only non-canonical blocks which will be removed are
|
/// If a finalized hash is provided, the only non-canonical blocks which will be removed are
|
||||||
@ -214,17 +316,18 @@ impl TreeState {
|
|||||||
pub(crate) fn remove_until(
|
pub(crate) fn remove_until(
|
||||||
&mut self,
|
&mut self,
|
||||||
upper_bound: BlockNumber,
|
upper_bound: BlockNumber,
|
||||||
finalized_num: Option<BlockNumber>,
|
last_persisted_hash: B256,
|
||||||
|
finalized_num_hash: Option<BlockNumHash>,
|
||||||
) {
|
) {
|
||||||
debug!(target: "engine", ?upper_bound, ?finalized_num, "Removing blocks from the tree");
|
debug!(target: "engine", ?upper_bound, ?finalized_num_hash, "Removing blocks from the tree");
|
||||||
|
|
||||||
// If the finalized num is ahead of the upper bound, and exists, we need to instead ensure
|
// If the finalized num is ahead of the upper bound, and exists, we need to instead ensure
|
||||||
// that the only blocks removed, are canonical blocks less than the upper bound
|
// that the only blocks removed, are canonical blocks less than the upper bound
|
||||||
// finalized_num.take_if(|finalized| *finalized > upper_bound);
|
// finalized_num.take_if(|finalized| *finalized > upper_bound);
|
||||||
let finalized_num = finalized_num.map(|finalized| {
|
let finalized_num_hash = finalized_num_hash.map(|mut finalized| {
|
||||||
let new_finalized_num = finalized.min(upper_bound);
|
finalized.number = finalized.number.min(upper_bound);
|
||||||
debug!(target: "engine", ?new_finalized_num, "Adjusted upper bound");
|
debug!(target: "engine", ?finalized, "Adjusted upper bound");
|
||||||
new_finalized_num
|
finalized
|
||||||
});
|
});
|
||||||
|
|
||||||
// We want to do two things:
|
// We want to do two things:
|
||||||
@ -234,60 +337,12 @@ impl TreeState {
|
|||||||
// * remove all canonical blocks below the upper bound
|
// * remove all canonical blocks below the upper bound
|
||||||
// * fetch the number of the finalized hash, removing any sidechains that are __below__ the
|
// * fetch the number of the finalized hash, removing any sidechains that are __below__ the
|
||||||
// finalized block
|
// finalized block
|
||||||
|
self.remove_canonical_until(upper_bound, last_persisted_hash);
|
||||||
// First, let's walk back the canonical chain and remove canonical blocks lower than the
|
|
||||||
// upper bound
|
|
||||||
let mut current_block = self.current_canonical_head.hash;
|
|
||||||
while let Some(executed) = self.blocks_by_hash.get(¤t_block) {
|
|
||||||
current_block = executed.block.parent_hash;
|
|
||||||
if executed.block.number <= upper_bound {
|
|
||||||
if let Some((removed, _)) = self.remove_by_hash(executed.block.hash()) {
|
|
||||||
// finally, move the trie updates
|
|
||||||
self.persisted_trie_updates
|
|
||||||
.insert(removed.block.hash(), (removed.block.number, removed.trie));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now, we have removed canonical blocks (assuming the upper bound is above the finalized
|
// Now, we have removed canonical blocks (assuming the upper bound is above the finalized
|
||||||
// block) and only have sidechains below the finalized block.
|
// block) and only have sidechains below the finalized block.
|
||||||
if let Some(finalized) = finalized_num {
|
if let Some(finalized_num_hash) = finalized_num_hash {
|
||||||
// We remove disconnected sidechains in three steps:
|
self.prune_finalized_sidechains(finalized_num_hash);
|
||||||
// * first, remove everything with a block number __below__ the finalized block.
|
|
||||||
// * next, we populate a vec with parents __at__ the finalized block.
|
|
||||||
// * finally, we iterate through the vec, removing children until the vec is empty
|
|
||||||
// (BFS).
|
|
||||||
|
|
||||||
// We _exclude_ the finalized block because we will be dealing with the blocks __at__
|
|
||||||
// the finalized block later.
|
|
||||||
let blocks_to_remove = self
|
|
||||||
.blocks_by_number
|
|
||||||
.range((Bound::Unbounded, Bound::Excluded(finalized)))
|
|
||||||
.flat_map(|(_, blocks)| blocks.iter().map(|b| b.block.hash()))
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
for hash in blocks_to_remove {
|
|
||||||
self.remove_by_hash(hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove trie updates that are below the finalized block
|
|
||||||
self.persisted_trie_updates.retain(|_, (block_num, _)| *block_num <= finalized);
|
|
||||||
|
|
||||||
// The only blocks that exist at `finalized_num` now, are blocks in sidechains that
|
|
||||||
// should be removed.
|
|
||||||
//
|
|
||||||
// We first put their children into this vec.
|
|
||||||
// Then, we will iterate over them, removing them, adding their children, etc etc,
|
|
||||||
// until the vec is empty.
|
|
||||||
let mut blocks_to_remove = self
|
|
||||||
.blocks_by_number
|
|
||||||
.remove(&finalized)
|
|
||||||
.map(|blocks| blocks.into_iter().map(|e| e.block.hash()).collect::<VecDeque<_>>())
|
|
||||||
.unwrap_or_default();
|
|
||||||
while let Some(block) = blocks_to_remove.pop_front() {
|
|
||||||
if let Some((_, children)) = self.remove_by_hash(block) {
|
|
||||||
blocks_to_remove.extend(children);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -531,7 +586,7 @@ where
|
|||||||
last_persisted_block_hash: header.hash(),
|
last_persisted_block_hash: header.hash(),
|
||||||
last_persisted_block_number: best_block_number,
|
last_persisted_block_number: best_block_number,
|
||||||
rx: None,
|
rx: None,
|
||||||
remove_above_state: None,
|
remove_above_state: VecDeque::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let (tx, outgoing) = tokio::sync::mpsc::unbounded_channel();
|
let (tx, outgoing) = tokio::sync::mpsc::unbounded_channel();
|
||||||
@ -1053,11 +1108,14 @@ where
|
|||||||
/// or send a new persistence action if necessary.
|
/// or send a new persistence action if necessary.
|
||||||
fn advance_persistence(&mut self) -> Result<(), TryRecvError> {
|
fn advance_persistence(&mut self) -> Result<(), TryRecvError> {
|
||||||
if !self.persistence_state.in_progress() {
|
if !self.persistence_state.in_progress() {
|
||||||
if let Some(new_tip_num) = self.persistence_state.remove_above_state.take() {
|
if let Some(new_tip_num) = self.persistence_state.remove_above_state.pop_front() {
|
||||||
debug!(target: "engine", ?new_tip_num, "Removing blocks using persistence task");
|
debug!(target: "engine", ?new_tip_num, remove_state=?self.persistence_state.remove_above_state, last_persisted_block_number=?self.persistence_state.last_persisted_block_number, "Removing blocks using persistence task");
|
||||||
|
if new_tip_num < self.persistence_state.last_persisted_block_number {
|
||||||
|
debug!(target: "engine", ?new_tip_num, "Starting remove blocks job");
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
|
let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
|
||||||
self.persistence_state.start(rx);
|
self.persistence_state.start(rx);
|
||||||
|
}
|
||||||
} else if self.should_persist() {
|
} else if self.should_persist() {
|
||||||
let blocks_to_persist = self.get_canonical_blocks_to_persist();
|
let blocks_to_persist = self.get_canonical_blocks_to_persist();
|
||||||
if blocks_to_persist.is_empty() {
|
if blocks_to_persist.is_empty() {
|
||||||
@ -1078,22 +1136,23 @@ where
|
|||||||
.expect("if a persistence task is in progress Receiver must be Some");
|
.expect("if a persistence task is in progress Receiver must be Some");
|
||||||
// Check if persistence has complete
|
// Check if persistence has complete
|
||||||
match rx.try_recv() {
|
match rx.try_recv() {
|
||||||
Ok(last_persisted_block_hash) => {
|
Ok(last_persisted_hash_num) => {
|
||||||
self.metrics.engine.persistence_duration.record(start_time.elapsed());
|
self.metrics.engine.persistence_duration.record(start_time.elapsed());
|
||||||
let Some(last_persisted_block_hash) = last_persisted_block_hash else {
|
let Some(BlockNumHash {
|
||||||
|
hash: last_persisted_block_hash,
|
||||||
|
number: last_persisted_block_number,
|
||||||
|
}) = last_persisted_hash_num
|
||||||
|
else {
|
||||||
// if this happened, then we persisted no blocks because we sent an
|
// if this happened, then we persisted no blocks because we sent an
|
||||||
// empty vec of blocks
|
// empty vec of blocks
|
||||||
warn!(target: "engine", "Persistence task completed but did not persist any blocks");
|
warn!(target: "engine", "Persistence task completed but did not persist any blocks");
|
||||||
return Ok(())
|
return Ok(())
|
||||||
};
|
};
|
||||||
if let Some(block) =
|
|
||||||
self.state.tree_state.block_by_hash(last_persisted_block_hash)
|
trace!(target: "engine", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
|
||||||
{
|
self.persistence_state
|
||||||
self.persistence_state.finish(last_persisted_block_hash, block.number);
|
.finish(last_persisted_block_hash, last_persisted_block_number);
|
||||||
self.on_new_persisted_block();
|
self.on_new_persisted_block();
|
||||||
} else {
|
|
||||||
error!("could not find persisted block with hash {last_persisted_block_hash} in memory");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Err(TryRecvError::Closed) => return Err(TryRecvError::Closed),
|
Err(TryRecvError::Closed) => return Err(TryRecvError::Closed),
|
||||||
Err(TryRecvError::Empty) => self.persistence_state.rx = Some((rx, start_time)),
|
Err(TryRecvError::Empty) => self.persistence_state.rx = Some((rx, start_time)),
|
||||||
@ -1211,7 +1270,16 @@ where
|
|||||||
//
|
//
|
||||||
// We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
|
// We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
|
||||||
// before that
|
// before that
|
||||||
self.state.tree_state.remove_until(backfill_height, Some(backfill_height));
|
let backfill_num_hash = self
|
||||||
|
.provider
|
||||||
|
.block_hash(backfill_height)?
|
||||||
|
.map(|hash| BlockNumHash { hash, number: backfill_height });
|
||||||
|
|
||||||
|
self.state.tree_state.remove_until(
|
||||||
|
backfill_height,
|
||||||
|
self.persistence_state.last_persisted_block_hash,
|
||||||
|
backfill_num_hash,
|
||||||
|
);
|
||||||
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
|
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
|
||||||
|
|
||||||
// remove all buffered blocks below the backfill height
|
// remove all buffered blocks below the backfill height
|
||||||
@ -1389,10 +1457,13 @@ where
|
|||||||
/// Assumes that `finish` has been called on the `persistence_state` at least once
|
/// Assumes that `finish` has been called on the `persistence_state` at least once
|
||||||
fn on_new_persisted_block(&mut self) {
|
fn on_new_persisted_block(&mut self) {
|
||||||
let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
|
let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
|
||||||
|
debug!(target: "engine", last_persisted_hash=?self.persistence_state.last_persisted_block_hash, last_persisted_number=?self.persistence_state.last_persisted_block_number, ?finalized, "New persisted block, clearing in memory blocks");
|
||||||
self.remove_before(self.persistence_state.last_persisted_block_number, finalized)
|
self.remove_before(self.persistence_state.last_persisted_block_number, finalized)
|
||||||
.expect("todo: error handling");
|
.expect("todo: error handling");
|
||||||
self.canonical_in_memory_state
|
self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
|
||||||
.remove_persisted_blocks(self.persistence_state.last_persisted_block_number);
|
number: self.persistence_state.last_persisted_block_number,
|
||||||
|
hash: self.persistence_state.last_persisted_block_hash,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return an [`ExecutedBlock`] from database or in-memory state by hash.
|
/// Return an [`ExecutedBlock`] from database or in-memory state by hash.
|
||||||
@ -1402,8 +1473,8 @@ where
|
|||||||
/// has in memory.
|
/// has in memory.
|
||||||
///
|
///
|
||||||
/// For finalized blocks, this will return `None`.
|
/// For finalized blocks, this will return `None`.
|
||||||
#[allow(unused)]
|
|
||||||
fn executed_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock>> {
|
fn executed_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock>> {
|
||||||
|
trace!(target: "engine", ?hash, "Fetching executed block by hash");
|
||||||
// check memory first
|
// check memory first
|
||||||
let block = self.state.tree_state.executed_block_by_hash(hash).cloned();
|
let block = self.state.tree_state.executed_block_by_hash(hash).cloned();
|
||||||
|
|
||||||
@ -1858,6 +1929,16 @@ where
|
|||||||
let tip = chain_update.tip().header.clone();
|
let tip = chain_update.tip().header.clone();
|
||||||
let notification = chain_update.to_chain_notification();
|
let notification = chain_update.to_chain_notification();
|
||||||
|
|
||||||
|
// reinsert any missing reorged blocks
|
||||||
|
if let NewCanonicalChain::Reorg { new, old } = &chain_update {
|
||||||
|
let new_first = new.first().map(|first| first.block.num_hash());
|
||||||
|
let old_first = old.first().map(|first| first.block.num_hash());
|
||||||
|
trace!(target: "engine", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
|
||||||
|
|
||||||
|
self.reinsert_reorged_blocks(new.clone());
|
||||||
|
self.reinsert_reorged_blocks(old.clone());
|
||||||
|
}
|
||||||
|
|
||||||
// update the tracked in-memory state with the new chain
|
// update the tracked in-memory state with the new chain
|
||||||
self.canonical_in_memory_state.update_chain(chain_update);
|
self.canonical_in_memory_state.update_chain(chain_update);
|
||||||
self.canonical_in_memory_state.set_canonical_head(tip.clone());
|
self.canonical_in_memory_state.set_canonical_head(tip.clone());
|
||||||
@ -1872,6 +1953,16 @@ where
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This reinserts any blocks in the new chain that do not already exist in the tree
|
||||||
|
fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock>) {
|
||||||
|
for block in new_chain {
|
||||||
|
if self.state.tree_state.executed_block_by_hash(block.block.hash()).is_none() {
|
||||||
|
trace!(target: "engine", num=?block.block.number, hash=?block.block.hash(), "Reinserting block into tree state");
|
||||||
|
self.state.tree_state.insert_executed(block);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
|
/// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
|
||||||
///
|
///
|
||||||
/// This mainly compares the missing parent of the downloaded block with the current canonical
|
/// This mainly compares the missing parent of the downloaded block with the current canonical
|
||||||
@ -2338,10 +2429,17 @@ where
|
|||||||
) -> ProviderResult<()> {
|
) -> ProviderResult<()> {
|
||||||
// first fetch the finalized block number and then call the remove_before method on
|
// first fetch the finalized block number and then call the remove_before method on
|
||||||
// tree_state
|
// tree_state
|
||||||
let num =
|
let num = if let Some(hash) = finalized_hash {
|
||||||
if let Some(hash) = finalized_hash { self.provider.block_number(hash)? } else { None };
|
self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
self.state.tree_state.remove_until(upper_bound, num);
|
self.state.tree_state.remove_until(
|
||||||
|
upper_bound,
|
||||||
|
self.persistence_state.last_persisted_block_hash,
|
||||||
|
num,
|
||||||
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2355,14 +2453,14 @@ pub struct PersistenceState {
|
|||||||
last_persisted_block_hash: B256,
|
last_persisted_block_hash: B256,
|
||||||
/// Receiver end of channel where the result of the persistence task will be
|
/// Receiver end of channel where the result of the persistence task will be
|
||||||
/// sent when done. A None value means there's no persistence task in progress.
|
/// sent when done. A None value means there's no persistence task in progress.
|
||||||
rx: Option<(oneshot::Receiver<Option<B256>>, Instant)>,
|
rx: Option<(oneshot::Receiver<Option<BlockNumHash>>, Instant)>,
|
||||||
/// The last persisted block number.
|
/// The last persisted block number.
|
||||||
///
|
///
|
||||||
/// This tracks the chain height that is persisted on disk
|
/// This tracks the chain height that is persisted on disk
|
||||||
last_persisted_block_number: u64,
|
last_persisted_block_number: u64,
|
||||||
/// The block above which blocks should be removed from disk, because there has been an on disk
|
/// The block above which blocks should be removed from disk, because there has been an on disk
|
||||||
/// reorg.
|
/// reorg.
|
||||||
remove_above_state: Option<u64>,
|
remove_above_state: VecDeque<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PersistenceState {
|
impl PersistenceState {
|
||||||
@ -2373,14 +2471,15 @@ impl PersistenceState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Sets state for a started persistence task.
|
/// Sets state for a started persistence task.
|
||||||
fn start(&mut self, rx: oneshot::Receiver<Option<B256>>) {
|
fn start(&mut self, rx: oneshot::Receiver<Option<BlockNumHash>>) {
|
||||||
self.rx = Some((rx, Instant::now()));
|
self.rx = Some((rx, Instant::now()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets the `remove_above_state`, to the new tip number specified.
|
/// Sets the `remove_above_state`, to the new tip number specified, only if it is less than the
|
||||||
|
/// current `last_persisted_block_number`.
|
||||||
fn schedule_removal(&mut self, new_tip_num: u64) {
|
fn schedule_removal(&mut self, new_tip_num: u64) {
|
||||||
// TODO: what about multiple on-disk reorgs in a row?
|
debug!(target: "engine", ?new_tip_num, prev_remove_state=?self.remove_above_state, last_persisted_block_number=?self.last_persisted_block_number, "Scheduling removal");
|
||||||
self.remove_above_state = Some(new_tip_num);
|
self.remove_above_state.push_back(new_tip_num);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets state for a finished persistence task.
|
/// Sets state for a finished persistence task.
|
||||||
@ -2989,7 +3088,8 @@ mod tests {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_tree_state_remove_before() {
|
async fn test_tree_state_remove_before() {
|
||||||
let mut tree_state = TreeState::new(BlockNumHash::default());
|
let start_num_hash = BlockNumHash::default();
|
||||||
|
let mut tree_state = TreeState::new(start_num_hash);
|
||||||
let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
|
let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
|
||||||
|
|
||||||
for block in &blocks {
|
for block in &blocks {
|
||||||
@ -3002,7 +3102,7 @@ mod tests {
|
|||||||
tree_state.set_canonical_head(last.block.num_hash());
|
tree_state.set_canonical_head(last.block.num_hash());
|
||||||
|
|
||||||
// inclusive bound, so we should remove anything up to and including 2
|
// inclusive bound, so we should remove anything up to and including 2
|
||||||
tree_state.remove_until(2, Some(2));
|
tree_state.remove_until(2, start_num_hash.hash, Some(blocks[1].block.num_hash()));
|
||||||
|
|
||||||
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
|
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
|
||||||
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
|
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
|
||||||
@ -3034,7 +3134,8 @@ mod tests {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_tree_state_remove_before_finalized() {
|
async fn test_tree_state_remove_before_finalized() {
|
||||||
let mut tree_state = TreeState::new(BlockNumHash::default());
|
let start_num_hash = BlockNumHash::default();
|
||||||
|
let mut tree_state = TreeState::new(start_num_hash);
|
||||||
let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
|
let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
|
||||||
|
|
||||||
for block in &blocks {
|
for block in &blocks {
|
||||||
@ -3047,7 +3148,7 @@ mod tests {
|
|||||||
tree_state.set_canonical_head(last.block.num_hash());
|
tree_state.set_canonical_head(last.block.num_hash());
|
||||||
|
|
||||||
// we should still remove everything up to and including 2
|
// we should still remove everything up to and including 2
|
||||||
tree_state.remove_until(2, None);
|
tree_state.remove_until(2, start_num_hash.hash, None);
|
||||||
|
|
||||||
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
|
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
|
||||||
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
|
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
|
||||||
@ -3079,7 +3180,8 @@ mod tests {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_tree_state_remove_before_lower_finalized() {
|
async fn test_tree_state_remove_before_lower_finalized() {
|
||||||
let mut tree_state = TreeState::new(BlockNumHash::default());
|
let start_num_hash = BlockNumHash::default();
|
||||||
|
let mut tree_state = TreeState::new(start_num_hash);
|
||||||
let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
|
let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
|
||||||
|
|
||||||
for block in &blocks {
|
for block in &blocks {
|
||||||
@ -3092,7 +3194,7 @@ mod tests {
|
|||||||
tree_state.set_canonical_head(last.block.num_hash());
|
tree_state.set_canonical_head(last.block.num_hash());
|
||||||
|
|
||||||
// we have no forks so we should still remove anything up to and including 2
|
// we have no forks so we should still remove anything up to and including 2
|
||||||
tree_state.remove_until(2, Some(1));
|
tree_state.remove_until(2, start_num_hash.hash, Some(blocks[0].block.num_hash()));
|
||||||
|
|
||||||
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
|
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
|
||||||
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
|
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
|
||||||
|
|||||||
@ -260,16 +260,23 @@ where
|
|||||||
|
|
||||||
// Get the total txs for the block range, so we have the correct number of columns for
|
// Get the total txs for the block range, so we have the correct number of columns for
|
||||||
// receipts and transactions
|
// receipts and transactions
|
||||||
|
// IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block
|
||||||
let tx_range = self
|
let tx_range = self
|
||||||
.database()
|
.database()
|
||||||
.transaction_range_by_block_range(block_number..=highest_static_file_block)?;
|
.transaction_range_by_block_range(block_number + 1..=highest_static_file_block)?;
|
||||||
let total_txs = tx_range.end().saturating_sub(*tx_range.start());
|
let total_txs = tx_range.end().saturating_sub(*tx_range.start());
|
||||||
|
|
||||||
|
// IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block
|
||||||
debug!(target: "provider::storage_writer", ?block_number, "Removing blocks from database above block_number");
|
debug!(target: "provider::storage_writer", ?block_number, "Removing blocks from database above block_number");
|
||||||
self.database().remove_block_and_execution_range(
|
self.database().remove_block_and_execution_range(
|
||||||
block_number..=self.database().last_block_number()?,
|
block_number + 1..=self.database().last_block_number()?,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
|
||||||
|
// we remove only what is ABOVE the block.
|
||||||
|
//
|
||||||
|
// i.e., if the highest static file block is 8, we want to remove above block 5 only, we
|
||||||
|
// will have three blocks to remove, which will be block 8, 7, and 6.
|
||||||
debug!(target: "provider::storage_writer", ?block_number, "Removing static file blocks above block_number");
|
debug!(target: "provider::storage_writer", ?block_number, "Removing static file blocks above block_number");
|
||||||
self.static_file()
|
self.static_file()
|
||||||
.get_writer(block_number, StaticFileSegment::Headers)?
|
.get_writer(block_number, StaticFileSegment::Headers)?
|
||||||
|
|||||||
Reference in New Issue
Block a user