feat(tree): unconnected block buffering (#2397)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
rakita
2023-04-28 23:46:45 +02:00
committed by GitHub
parent b8b2772932
commit 435df44127
16 changed files with 657 additions and 107 deletions

View File

@ -19,11 +19,10 @@ reth-interfaces = { path = "../interfaces" }
reth-db = { path = "../storage/db" }
reth-provider = { path = "../storage/provider" }
# tracing
tracing = "0.1"
# common
parking_lot = { version = "0.12"}
lru = "0.10"
tracing = "0.1"
# mics
aquamarine = "0.3.0"

View File

@ -0,0 +1,398 @@
use lru::LruCache;
use reth_primitives::{BlockHash, BlockNumHash, BlockNumber, SealedBlockWithSenders};
use std::{
collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet},
num::NonZeroUsize,
};
/// Type that contains blocks by number and hash.
pub type BufferedBlocks = BTreeMap<BlockNumber, HashMap<BlockHash, SealedBlockWithSenders>>;
/// Contains the Tree of pending blocks that are not executed but buffered
/// It allows us to store unconnected blocks for potential inclusion.
///
/// It has three main functionality:
/// * `insert_block` for inserting blocks inside the buffer.
/// * `take_all_childrens` for connecting blocks if the parent gets received and inserted.
/// * `clean_old_blocks` to clear old blocks that are below finalized line.
///
/// Note: Buffer is limited by number of blocks that it can contains and eviction of the block
/// is done by last recently used block.
#[derive(Debug)]
pub struct BlockBuffer {
/// Blocks ordered by block number inside the BTreeMap.
///
/// Note: BTreeMap is used so that we can remove the finalized old blocks
/// from the buffer
blocks: BufferedBlocks,
/// Needed for removal of the blocks. and to connect the potential unconnected block
/// to the connected one.
parent_to_child: HashMap<BlockHash, HashSet<BlockNumHash>>,
/// LRU used for tracing oldest inserted blocks that are going to be
/// first in line for evicting if `max_blocks` limit is hit.
///
/// Used as counter of amount of blocks inside buffer.
lru: LruCache<BlockNumHash, ()>,
}
impl BlockBuffer {
/// Create new buffer with max limit of blocks
pub fn new(limit: usize) -> Self {
Self {
blocks: Default::default(),
parent_to_child: Default::default(),
lru: LruCache::new(NonZeroUsize::new(limit).unwrap()),
}
}
/// Insert block inside the buffer.
pub fn insert_block(&mut self, block: SealedBlockWithSenders) {
let num_hash = block.num_hash();
self.parent_to_child.entry(block.parent_hash).or_default().insert(block.num_hash());
self.blocks.entry(block.number).or_default().insert(block.hash, block);
if let Some((evicted_num_hash, _)) =
self.lru.push(num_hash, ()).filter(|(b, _)| *b != num_hash)
{
// evict the block if limit is hit
if let Some(evicted_block) = self.remove_from_blocks(&evicted_num_hash) {
// evict the block if limit is hit
self.remove_from_parent(evicted_block.parent_hash, &evicted_num_hash);
}
}
}
/// Get all the children of the block and its child children.
/// This is used to get all the blocks that are dependent on the block that is included.
///
/// Note: that order of returned blocks is important and the blocks with lower block number
/// in the chain will come first so that they can be executed in the correct order.
pub fn take_all_childrens(&mut self, parent: BlockNumHash) -> Vec<SealedBlockWithSenders> {
// remove parent block if present
let mut taken = Vec::new();
if let Some(block) = self.remove_from_blocks(&parent) {
taken.push(block);
}
taken.extend(self.remove_children(vec![parent]).into_iter());
taken
}
/// Clean up the old blocks from the buffer as blocks before finalization are not needed
/// anymore. We can discard them from the buffer.
pub fn clean_old_blocks(&mut self, finalized_number: BlockNumber) {
let mut remove_parent_children = Vec::new();
// discard all blocks that are before the finalized number.
while let Some(entry) = self.blocks.first_entry() {
if *entry.key() > finalized_number {
break
}
let blocks = entry.remove();
remove_parent_children.extend(
blocks.into_iter().map(|(hash, block)| BlockNumHash::new(block.number, hash)),
);
}
// remove from lru
for block in remove_parent_children.iter() {
self.lru.pop(block);
}
self.remove_children(remove_parent_children);
}
/// Return reference to buffered blocks
pub fn blocks(&self) -> &BufferedBlocks {
&self.blocks
}
/// Return reference to the asked block.
pub fn block(&self, block: BlockNumHash) -> Option<&SealedBlockWithSenders> {
self.blocks.get(&block.number)?.get(&block.hash)
}
/// Return number of blocks inside buffer.
pub fn len(&self) -> usize {
self.lru.len()
}
/// Return if buffer is empty.
pub fn is_empty(&self) -> bool {
self.lru.is_empty()
}
/// Remove from parent child connection. Dont touch childrens.
fn remove_from_parent(&mut self, parent: BlockHash, block: &BlockNumHash) {
// remove from parent to child connection, but only for this block parent.
if let hash_map::Entry::Occupied(mut entry) = self.parent_to_child.entry(parent) {
entry.get_mut().remove(block);
// if set is empty remove block entry.
if entry.get().is_empty() {
entry.remove();
}
};
}
/// Remove block from `self.blocks`, This will also remove block from `self.lru`.
///
/// Note: This function will not remove block from the `self.parent_to_child` connection.
fn remove_from_blocks(&mut self, block: &BlockNumHash) -> Option<SealedBlockWithSenders> {
if let Entry::Occupied(mut entry) = self.blocks.entry(block.number) {
let ret = entry.get_mut().remove(&block.hash);
// if set is empty remove block entry.
if entry.get().is_empty() {
entry.remove();
}
self.lru.pop(block);
return ret
};
None
}
/// Remove all children and their descendants for the given blocks and return them.
fn remove_children(&mut self, parent_blocks: Vec<BlockNumHash>) -> Vec<SealedBlockWithSenders> {
// remove all parent child connection and all the child children blocks that are connected
// to the discarded parent blocks.
let mut remove_parent_children = parent_blocks;
let mut removed_blocks = Vec::new();
while let Some(parent_num_hash) = remove_parent_children.pop() {
// get this child blocks children and add them to the remove list.
if let Some(parent_childrens) = self.parent_to_child.remove(&parent_num_hash.hash) {
// remove child from buffer
for child in parent_childrens.iter() {
if let Some(block) = self.remove_from_blocks(child) {
removed_blocks.push(block);
}
}
remove_parent_children.extend(parent_childrens.into_iter());
}
}
removed_blocks
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use reth_interfaces::test_utils::generators::random_block;
use reth_primitives::{BlockHash, BlockNumHash, SealedBlockWithSenders};
use crate::BlockBuffer;
fn create_block(number: u64, parent: BlockHash) -> SealedBlockWithSenders {
let block = random_block(number, Some(parent), None, None);
block.seal_with_senders().unwrap()
}
#[test]
fn simple_insertion() {
let block1 = create_block(10, BlockHash::random());
let mut buffer = BlockBuffer::new(3);
buffer.insert_block(block1.clone());
assert_eq!(buffer.len(), 1);
assert_eq!(buffer.block(block1.num_hash()), Some(&block1));
}
#[test]
fn take_all_chain_of_childrens() {
let main_parent = BlockNumHash::new(9, BlockHash::random());
let block1 = create_block(10, main_parent.hash);
let block2 = create_block(11, block1.hash);
let block3 = create_block(12, block2.hash);
let block4 = create_block(14, BlockHash::random());
let mut buffer = BlockBuffer::new(5);
buffer.insert_block(block1.clone());
buffer.insert_block(block2.clone());
buffer.insert_block(block3.clone());
buffer.insert_block(block4.clone());
assert_eq!(buffer.len(), 4);
assert_eq!(buffer.take_all_childrens(main_parent), vec![block1, block2, block3]);
assert_eq!(buffer.len(), 1);
}
#[test]
fn take_all_multi_level_childrens() {
let main_parent = BlockNumHash::new(9, BlockHash::random());
let block1 = create_block(10, main_parent.hash);
let block2 = create_block(11, block1.hash);
let block3 = create_block(11, block1.hash);
let block4 = create_block(12, block2.hash);
let mut buffer = BlockBuffer::new(5);
buffer.insert_block(block1.clone());
buffer.insert_block(block2.clone());
buffer.insert_block(block3.clone());
buffer.insert_block(block4.clone());
assert_eq!(buffer.len(), 4);
assert_eq!(
buffer
.take_all_childrens(main_parent)
.into_iter()
.map(|b| (b.hash, b))
.collect::<HashMap<_, _>>(),
HashMap::from([
(block1.hash, block1),
(block2.hash, block2),
(block3.hash, block3),
(block4.hash, block4)
])
);
assert_eq!(buffer.len(), 0);
}
#[test]
fn take_self_with_childs() {
let main_parent = BlockNumHash::new(9, BlockHash::random());
let block1 = create_block(10, main_parent.hash);
let block2 = create_block(11, block1.hash);
let block3 = create_block(11, block1.hash);
let block4 = create_block(12, block2.hash);
let mut buffer = BlockBuffer::new(5);
buffer.insert_block(block1.clone());
buffer.insert_block(block2.clone());
buffer.insert_block(block3.clone());
buffer.insert_block(block4.clone());
assert_eq!(buffer.len(), 4);
assert_eq!(
buffer
.take_all_childrens(block1.num_hash())
.into_iter()
.map(|b| (b.hash, b))
.collect::<HashMap<_, _>>(),
HashMap::from([
(block1.hash, block1),
(block2.hash, block2),
(block3.hash, block3),
(block4.hash, block4)
])
);
assert_eq!(buffer.len(), 0);
}
#[test]
fn clean_chain_of_childres() {
let main_parent = BlockNumHash::new(9, BlockHash::random());
let block1 = create_block(10, main_parent.hash);
let block2 = create_block(11, block1.hash);
let block3 = create_block(12, block2.hash);
let block4 = create_block(14, BlockHash::random());
let mut buffer = BlockBuffer::new(5);
buffer.insert_block(block1.clone());
buffer.insert_block(block2.clone());
buffer.insert_block(block3.clone());
buffer.insert_block(block4.clone());
assert_eq!(buffer.len(), 4);
buffer.clean_old_blocks(block1.number);
assert_eq!(buffer.len(), 1);
}
#[test]
fn clean_all_multi_level_childrens() {
let main_parent = BlockNumHash::new(9, BlockHash::random());
let block1 = create_block(10, main_parent.hash);
let block2 = create_block(11, block1.hash);
let block3 = create_block(11, block1.hash);
let block4 = create_block(12, block2.hash);
let mut buffer = BlockBuffer::new(5);
buffer.insert_block(block1.clone());
buffer.insert_block(block2.clone());
buffer.insert_block(block3.clone());
buffer.insert_block(block4.clone());
assert_eq!(buffer.len(), 4);
buffer.clean_old_blocks(block1.number);
assert_eq!(buffer.len(), 0);
}
#[test]
fn clean_multi_chains() {
let main_parent = BlockNumHash::new(9, BlockHash::random());
let block1 = create_block(10, main_parent.hash);
let block1a = create_block(10, main_parent.hash);
let block2 = create_block(11, block1.hash);
let block2a = create_block(11, block1.hash);
let random_block1 = create_block(10, BlockHash::random());
let random_block2 = create_block(11, BlockHash::random());
let random_block3 = create_block(12, BlockHash::random());
let mut buffer = BlockBuffer::new(10);
buffer.insert_block(block1.clone());
buffer.insert_block(block1a.clone());
buffer.insert_block(block2.clone());
buffer.insert_block(block2a.clone());
buffer.insert_block(random_block1.clone());
buffer.insert_block(random_block2.clone());
buffer.insert_block(random_block3.clone());
assert_eq!(buffer.len(), 7);
buffer.clean_old_blocks(10);
assert_eq!(buffer.len(), 2);
}
fn assert_block_existance(buffer: &BlockBuffer, block: &SealedBlockWithSenders) {
assert!(buffer.blocks.get(&block.number).and_then(|t| t.get(&block.hash)).is_none());
assert!(buffer
.parent_to_child
.get(&block.parent_hash)
.and_then(|p| p.get(&block.num_hash()))
.is_none());
}
#[test]
fn evict_with_gap() {
let main_parent = BlockNumHash::new(9, BlockHash::random());
let block1 = create_block(10, main_parent.hash);
let block2 = create_block(11, block1.hash);
let block3 = create_block(12, block2.hash);
let block4 = create_block(13, BlockHash::random());
let mut buffer = BlockBuffer::new(3);
buffer.insert_block(block1.clone());
buffer.insert_block(block2.clone());
buffer.insert_block(block3.clone());
buffer.insert_block(block4.clone());
// block1 gets evicted
assert_block_existance(&buffer, &block1);
assert_eq!(buffer.len(), 3);
}
#[test]
fn simple_eviction() {
let main_parent = BlockNumHash::new(9, BlockHash::random());
let block1 = create_block(10, main_parent.hash);
let block2 = create_block(11, block1.hash);
let block3 = create_block(12, block2.hash);
let block4 = create_block(13, BlockHash::random());
let mut buffer = BlockBuffer::new(3);
buffer.insert_block(block1.clone());
buffer.insert_block(block2.clone());
buffer.insert_block(block3.clone());
buffer.insert_block(block4.clone());
// block3 gets evicted
assert_block_existance(&buffer, &block1);
assert_eq!(buffer.len(), 3);
}
}

View File

@ -118,11 +118,15 @@ impl BlockIndices {
pub fn update_block_hashes(
&mut self,
hashes: BTreeMap<u64, BlockHash>,
) -> BTreeSet<BlockChainId> {
let mut new_hashes = hashes.iter();
) -> (BTreeSet<BlockChainId>, Vec<BlockNumHash>) {
// set new canonical hashes.
self.canonical_chain = hashes.clone();
let mut new_hashes = hashes.into_iter();
let mut old_hashes = self.canonical_chain().clone().into_iter();
let mut remove = Vec::new();
let mut removed = Vec::new();
let mut added = Vec::new();
let mut new_hash = new_hashes.next();
let mut old_hash = old_hashes.next();
@ -130,6 +134,11 @@ impl BlockIndices {
loop {
let Some(old_block_value) = old_hash else {
// end of old_hashes canonical chain. New chain has more block then old chain.
while let Some(new) = new_hash {
// add new blocks to added list.
added.push(new.into());
new_hash = new_hashes.next();
}
break
};
let Some(new_block_value) = new_hash else {
@ -137,7 +146,7 @@ impl BlockIndices {
// remove all present block.
// this is mostly not going to happen as reorg should make new chain in Tree.
while let Some(rem) = old_hash {
remove.push(rem);
removed.push(rem);
old_hash = old_hashes.next();
}
break;
@ -146,29 +155,34 @@ impl BlockIndices {
match new_block_value.0.cmp(&old_block_value.0) {
std::cmp::Ordering::Less => {
// new chain has more past blocks than old chain
added.push(new_block_value.into());
new_hash = new_hashes.next();
}
std::cmp::Ordering::Equal => {
if *new_block_value.1 != old_block_value.1 {
if new_block_value.1 != old_block_value.1 {
// remove block hash as it is different
remove.push(old_block_value);
removed.push(old_block_value);
added.push(new_block_value.into());
}
new_hash = new_hashes.next();
old_hash = old_hashes.next();
}
std::cmp::Ordering::Greater => {
// old chain has more past blocks that new chain
remove.push(old_block_value);
removed.push(old_block_value);
old_hash = old_hashes.next()
}
}
}
self.canonical_chain = hashes;
remove.into_iter().fold(BTreeSet::new(), |mut fold, (number, hash)| {
fold.extend(self.remove_block(number, hash));
fold
})
// remove childs of removed blocks
(
removed.into_iter().fold(BTreeSet::new(), |mut fold, (number, hash)| {
fold.extend(self.remove_block(number, hash));
fold
}),
added,
)
}
/// Remove chain from indices and return dependent chains that needs to be removed.

View File

@ -1,10 +1,15 @@
//! Implementation of [`BlockchainTree`]
use crate::{
chain::BlockChainId, AppendableChain, BlockBuffer, BlockIndices, BlockchainTreeConfig,
PostStateData, TreeExternals,
};
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
use reth_interfaces::{
blockchain_tree::BlockStatus, consensus::Consensus, executor::Error as ExecError, Error,
};
use reth_primitives::{
BlockHash, BlockNumber, ForkBlock, Hardfork, SealedBlock, SealedBlockWithSenders, U256,
BlockHash, BlockNumHash, BlockNumber, ForkBlock, Hardfork, SealedBlock, SealedBlockWithSenders,
U256,
};
use reth_provider::{
chain::{ChainSplit, SplitAt},
@ -16,12 +21,7 @@ use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
use tracing::trace;
use crate::{
chain::BlockChainId, AppendableChain, BlockIndices, BlockchainTreeConfig, PostStateData,
TreeExternals,
};
use tracing::{info, trace};
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Tree of chains and its identifications.
@ -67,6 +67,8 @@ use crate::{
pub struct BlockchainTree<DB: Database, C: Consensus, EF: ExecutorFactory> {
/// The tracked chains and their current data.
chains: HashMap<BlockChainId, AppendableChain>,
/// Unconnected block buffer.
buffered_blocks: BlockBuffer,
/// Static blockchain ID generator
block_chain_id_generator: u64,
/// Indices to block and their connection to the canonical chain.
@ -118,6 +120,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
Ok(Self {
externals,
buffered_blocks: BlockBuffer::new(config.max_unconnected_blocks()),
block_chain_id_generator: 0,
chains: Default::default(),
block_indices: BlockIndices::new(
@ -145,15 +148,6 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
})
}
// we will not even try to insert blocks that are too far in the future.
if block.number > last_finalized_block + self.config.max_blocks_in_chain() {
return Err(ExecError::PendingBlockIsInFuture {
block_number: block.number,
block_hash: block.hash(),
last_finalized: last_finalized_block,
})
}
Ok(())
}
@ -227,11 +221,13 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
&mut self,
block: SealedBlockWithSenders,
) -> Result<BlockStatus, Error> {
// check if block parent can be found in Tree
let parent = block.parent_num_hash();
let block_num_hash = block.num_hash();
// Create a new sidechain by forking the given chain, or append the block if the parent
// block is the top of the given chain.
if let Some(chain_id) = self.block_indices.get_blocks_chain_id(&block.parent_hash) {
// check if block parent can be found in Tree
if let Some(chain_id) = self.block_indices.get_blocks_chain_id(&parent.hash) {
// Create a new sidechain by forking the given chain, or append the block if the parent
// block is the top of the given chain.
let block_hashes = self.all_chain_hashes(chain_id);
// get canonical fork.
@ -249,7 +245,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
let canonical_block_hashes = self.block_indices.canonical_chain();
// append the block if it is continuing the chain.
return if chain_tip == block.parent_hash {
let status = if chain_tip == block.parent_hash {
let block_hash = block.hash();
let block_number = block.number;
parent_chain.append_block(
@ -272,17 +268,19 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
)?;
self.insert_chain(chain);
Ok(BlockStatus::Accepted)
}
};
self.try_connect_buffered_blocks(block_num_hash);
return status
}
// if not found, check if the parent can be found inside canonical chain.
if Some(block.parent_hash) == self.block_indices.canonical_hash(block.number - 1) {
if Some(parent.hash) == self.block_indices.canonical_hash(parent.number) {
// create new chain that points to that block
//return self.fork_canonical_chain(block.clone());
// TODO save pending block to database
// https://github.com/paradigmxyz/reth/issues/1713
let db = self.externals.shareable_db();
let fork_block = ForkBlock { number: block.number - 1, hash: block.parent_hash };
// Validate that the block is post merge
let parent_td = db
@ -312,16 +310,22 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
&block,
&parent_header,
canonical_block_hashes,
fork_block,
parent,
&self.externals,
)?;
self.insert_chain(chain);
self.try_connect_buffered_blocks(block_num_hash);
return Ok(block_status)
}
// NOTE: Block doesn't have a parent, and if we receive this block in `make_canonical`
// function this could be a trigger to initiate p2p syncing, as we are missing the
// parent.
// if there is a parent inside the buffer, validate against it.
if let Some(buffered_parent) = self.buffered_blocks.block(parent) {
self.externals.consensus.validate_header_against_parent(&block, buffered_parent)?
}
// insert block inside unconnected block buffer. Delaying it execution.
self.buffered_blocks.insert_block(block);
Ok(BlockStatus::Disconnected)
}
@ -423,6 +427,38 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
self.insert_in_range_block_with_senders(block)
}
/// Insert block for future execution.
pub fn buffer_block(&mut self, block: SealedBlockWithSenders) -> Result<(), Error> {
self.validate_block(&block)?;
self.buffered_blocks.insert_block(block);
Ok(())
}
/// Validate if block is correct and if i
fn validate_block(&self, block: &SealedBlockWithSenders) -> Result<(), Error> {
if let Err(e) =
self.externals.consensus.validate_header_with_total_difficulty(block, U256::MAX)
{
info!(
"Failed to validate header for TD related check with error: {e:?}, block:{:?}",
block
);
return Err(e.into())
}
if let Err(e) = self.externals.consensus.validate_header(block) {
info!("Failed to validate header with error: {e:?}, block:{:?}", block);
return Err(e.into())
}
if let Err(e) = self.externals.consensus.validate_block(block) {
info!("Failed to validate blocks with error: {e:?}, block:{:?}", block);
return Err(e.into())
}
Ok(())
}
/// Same as [BlockchainTree::insert_block_with_senders] but expects that the block is in range,
/// See [BlockchainTree::ensure_block_is_in_range].
pub(crate) fn insert_in_range_block_with_senders(
@ -446,21 +482,28 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
return Ok(BlockStatus::Valid)
}
// validate block hashes
self.validate_block(&block)?;
// try to insert block
self.try_insert_block(block)
}
/// Finalize blocks up until and including `finalized_block`, and remove them from the tree.
pub fn finalize_block(&mut self, finalized_block: BlockNumber) {
// remove blocks
let mut remove_chains = self.block_indices.finalize_canonical_blocks(
finalized_block,
self.config.num_of_additional_canonical_block_hashes(),
);
// remove chains of removed blocks
while let Some(chain_id) = remove_chains.pop_first() {
if let Some(chain) = self.chains.remove(&chain_id) {
remove_chains.extend(self.block_indices.remove_chain(&chain));
}
}
// clean block buffer.
self.buffered_blocks.clean_old_blocks(finalized_block);
}
/// Reads the last `N` canonical hashes from the database and updates the block indices of the
@ -491,7 +534,8 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
.take(num_of_canonical_hashes as usize)
.collect::<Result<BTreeMap<BlockNumber, BlockHash>, _>>()?;
let mut remove_chains = self.block_indices.update_block_hashes(last_canonical_hashes);
let (mut remove_chains, _) =
self.block_indices.update_block_hashes(last_canonical_hashes.clone());
// remove all chains that got discarded
while let Some(chain_id) = remove_chains.first() {
@ -500,9 +544,35 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
}
}
// check unconnected block buffer for the childs of new added blocks,
for added_block in last_canonical_hashes.into_iter() {
self.try_connect_buffered_blocks(added_block.into())
}
// check unconnected block buffer for childs of the chains.
let mut all_chain_blocks = Vec::new();
for (_, chain) in self.chains.iter() {
for (&number, blocks) in chain.blocks.iter() {
all_chain_blocks.push(BlockNumHash { number, hash: blocks.hash })
}
}
for block in all_chain_blocks.into_iter() {
self.try_connect_buffered_blocks(block)
}
Ok(())
}
/// Connect unconnected blocks
fn try_connect_buffered_blocks(&mut self, new_block: BlockNumHash) {
let include_blocks = self.buffered_blocks.take_all_childrens(new_block);
// insert child blocks
for block in include_blocks.into_iter() {
// dont fail on error, just ignore the block.
let _ = self.insert_block_with_senders(block);
}
}
/// Split a sidechain at the given point, and return the canonical part of it.
///
/// The pending part of the chain is reinserted into the tree with the same `chain_id`.
@ -687,6 +757,8 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
#[cfg(test)]
mod tests {
use crate::block_buffer::BufferedBlocks;
use super::*;
use assert_matches::assert_matches;
use reth_db::{
@ -747,6 +819,8 @@ mod tests {
fork_to_child: Option<HashMap<BlockHash, HashSet<BlockHash>>>,
/// Pending blocks
pending_blocks: Option<(BlockNumber, HashSet<BlockHash>)>,
/// Buffered blocks
buffered_blocks: Option<BufferedBlocks>,
}
impl TreeTester {
@ -766,6 +840,11 @@ mod tests {
self
}
fn with_buffered_blocks(mut self, buffered_blocks: BufferedBlocks) -> Self {
self.buffered_blocks = Some(buffered_blocks);
self
}
fn with_pending_blocks(
mut self,
pending_blocks: (BlockNumber, HashSet<BlockHash>),
@ -792,6 +871,9 @@ mod tests {
let hashes = hashes.into_iter().collect::<HashSet<_>>();
assert_eq!((num, hashes), pending_blocks);
}
if let Some(buffered_blocks) = self.buffered_blocks {
assert_eq!(*tree.buffered_blocks.blocks(), buffered_blocks);
}
}
}
@ -800,15 +882,16 @@ mod tests {
let data = BlockChainTestData::default_with_numbers(11, 12);
let (block1, exec1) = data.blocks[0].clone();
let (block2, exec2) = data.blocks[1].clone();
let genesis = data.genesis;
// test pops execution results from vector, so order is from last to first.
let externals = setup_externals(vec![exec2.clone(), exec1.clone(), exec2, exec1]);
// last finalized block would be number 9.
setup_genesis(externals.db.clone(), data.genesis);
setup_genesis(externals.db.clone(), genesis.clone());
// make tree
let config = BlockchainTreeConfig::new(1, 2, 3);
let config = BlockchainTreeConfig::new(1, 2, 3, 2);
let (sender, mut canon_notif) = tokio::sync::broadcast::channel(10);
let mut tree =
BlockchainTree::new(externals, sender, config).expect("failed to create tree");
@ -816,31 +899,29 @@ mod tests {
// genesis block 10 is already canonical
assert_eq!(tree.make_canonical(&H256::zero()), Ok(()));
// insert block2 hits max chain size
assert_eq!(
tree.insert_block_with_senders(block2.clone()),
Err(ExecError::PendingBlockIsInFuture {
block_number: block2.number,
block_hash: block2.hash(),
last_finalized: 9,
}
.into())
);
// make genesis block 10 as finalized
tree.finalize_block(10);
// block 2 parent is not known.
// block 2 parent is not known, block2 is buffered.
assert_eq!(tree.insert_block_with_senders(block2.clone()), Ok(BlockStatus::Disconnected));
// insert block1
assert_eq!(tree.insert_block_with_senders(block1.clone()), Ok(BlockStatus::Valid));
// already inserted block will return true.
// Buffered block: [block2]
// Trie state:
// |
// g1 (canonical blocks)
// |
TreeTester::default()
.with_buffered_blocks(BTreeMap::from([(
block2.number,
HashMap::from([(block2.hash(), block2.clone())]),
)]))
.assert(&tree);
// insert block1 and buffered block2 is inserted
assert_eq!(tree.insert_block_with_senders(block1.clone()), Ok(BlockStatus::Valid));
// insert block2
assert_eq!(tree.insert_block_with_senders(block2.clone()), Ok(BlockStatus::Valid));
// Buffered blocks: []
// Trie state:
// b2 (pending block)
// |
@ -857,6 +938,12 @@ mod tests {
.with_pending_blocks((block1.number, HashSet::from([block1.hash])))
.assert(&tree);
// already inserted block will return true.
assert_eq!(tree.insert_block_with_senders(block1.clone()), Ok(BlockStatus::Valid));
// block two is already inserted.
assert_eq!(tree.insert_block_with_senders(block2.clone()), Ok(BlockStatus::Valid));
// make block1 canonical
assert_eq!(tree.make_canonical(&block1.hash()), Ok(()));
// check notification
@ -882,6 +969,8 @@ mod tests {
.with_fork_to_child(HashMap::from([]))
.assert(&tree);
/**** INSERT SIDE BLOCKS *** */
let mut block1a = block1.clone();
let block1a_hash = H256([0x33; 32]);
block1a.hash = block1a_hash;
@ -1069,6 +1158,19 @@ mod tests {
Ok(CanonStateNotification::Commit{ new})
if *new.blocks() == BTreeMap::from([(block2.number,block2.clone())]));
// insert unconnected block2b
let mut block2b = block2a.clone();
block2b.hash = H256([0x99; 32]);
block2b.parent_hash = H256([0x88; 32]);
assert_eq!(tree.insert_block_with_senders(block2b.clone()), Ok(BlockStatus::Disconnected));
TreeTester::default()
.with_buffered_blocks(BTreeMap::from([(
block2b.number,
HashMap::from([(block2b.hash(), block2b.clone())]),
)]))
.assert(&tree);
// update canonical block to b2, this would make b2a be removed
assert_eq!(tree.restore_canonical_hashes(12), Ok(()));
// Trie state:
@ -1083,6 +1185,7 @@ mod tests {
.with_block_to_chain(HashMap::from([]))
.with_fork_to_child(HashMap::from([]))
.with_pending_blocks((block2.number + 1, HashSet::from([])))
.with_buffered_blocks(BTreeMap::from([]))
.assert(&tree);
}
}

View File

@ -138,10 +138,8 @@ impl AppendableChain {
C: Consensus,
EF: ExecutorFactory,
{
externals.consensus.validate_header_with_total_difficulty(&block, U256::MAX)?;
externals.consensus.validate_header(&block)?;
// some checks are done before blocks comes here.
externals.consensus.validate_header_against_parent(&block, parent_block)?;
externals.consensus.validate_block(&block)?;
let (unseal, senders) = block.into_components();
let unseal = unseal.unseal();

View File

@ -9,6 +9,8 @@ pub struct BlockchainTreeConfig {
max_blocks_in_chain: u64,
/// The number of blocks that can be re-orged (finalization windows)
max_reorg_depth: u64,
/// The number of unconnected blocks that we are buffering
max_unconnected_blocks: usize,
/// For EVM's "BLOCKHASH" opcode we require last 256 block hashes. So we need to specify
/// at least `additional_canonical_block_hashes`+`max_reorg_depth`, for eth that would be
/// 256+64.
@ -25,6 +27,8 @@ impl Default for BlockchainTreeConfig {
max_blocks_in_chain: 65,
// EVM requires that last 256 block hashes are available.
num_of_additional_canonical_block_hashes: 256,
// max unconnected blocks.
max_unconnected_blocks: 200,
}
}
}
@ -35,11 +39,17 @@ impl BlockchainTreeConfig {
max_reorg_depth: u64,
max_blocks_in_chain: u64,
num_of_additional_canonical_block_hashes: u64,
max_unconnected_blocks: usize,
) -> Self {
if max_reorg_depth > max_blocks_in_chain {
panic!("Side chain size should be more then finalization window");
}
Self { max_blocks_in_chain, max_reorg_depth, num_of_additional_canonical_block_hashes }
Self {
max_blocks_in_chain,
max_reorg_depth,
num_of_additional_canonical_block_hashes,
max_unconnected_blocks,
}
}
/// Return the maximum reorg depth.
@ -57,4 +67,9 @@ impl BlockchainTreeConfig {
pub fn num_of_additional_canonical_block_hashes(&self) -> u64 {
self.num_of_additional_canonical_block_hashes
}
/// Return max number of unconnected blocks that we are buffering
pub fn max_unconnected_blocks(&self) -> usize {
self.max_unconnected_blocks
}
}

View File

@ -29,3 +29,7 @@ pub use shareable::ShareableBlockchainTree;
pub mod post_state_data;
pub use post_state_data::{PostStateData, PostStateDataRef};
/// Buffer of not executed blocks.
pub mod block_buffer;
pub use block_buffer::BlockBuffer;

View File

@ -35,9 +35,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> ShareableBlockchainTree<DB
impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTreeEngine
for ShareableBlockchainTree<DB, C, EF>
{
/// Recover senders and call [`BlockchainTreeEngine::insert_block_with_senders`].
fn insert_block(&self, block: SealedBlock) -> Result<BlockStatus, Error> {
trace!(target: "blockchain_tree", ?block, "Inserting block");
fn insert_block_without_senders(&self, block: SealedBlock) -> Result<BlockStatus, Error> {
let mut tree = self.tree.write();
tree.ensure_block_is_in_range(&block)?;
let block = block
@ -46,11 +44,12 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTreeEngine
tree.insert_in_range_block_with_senders(block)
}
fn insert_block_with_senders(
&self,
block: SealedBlockWithSenders,
) -> Result<BlockStatus, Error> {
trace!(target: "blockchain_tree", ?block, "Inserting block with senders");
fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<(), Error> {
self.tree.write().buffer_block(block)
}
fn insert_block(&self, block: SealedBlockWithSenders) -> Result<BlockStatus, Error> {
trace!(target: "blockchain_tree", ?block, "Inserting block");
self.tree.write().insert_block_with_senders(block)
}