perf(tree): integrate parallel state root (#7161)

This commit is contained in:
Roman Krasiuk
2024-03-21 17:08:49 +01:00
committed by GitHub
parent aac0b00f53
commit 56b63adecc
17 changed files with 184 additions and 144 deletions

1
Cargo.lock generated
View File

@ -5588,6 +5588,7 @@ dependencies = [
"reth-revm",
"reth-stages",
"reth-trie",
"reth-trie-parallel",
"tokio",
"tracing",
]

View File

@ -18,6 +18,7 @@ reth-db.workspace = true
reth-provider.workspace = true
reth-stages.workspace = true
reth-trie = { workspace = true, features = ["metrics"] }
reth-trie-parallel = { workspace = true, features = ["parallel"] }
# common
parking_lot.workspace = true

View File

@ -76,7 +76,11 @@ pub struct BlockchainTree<DB: Database, EVM: ExecutorFactory> {
prune_modes: Option<PruneModes>,
}
impl<DB: Database, EVM: ExecutorFactory> BlockchainTree<DB, EVM> {
impl<DB, EVM> BlockchainTree<DB, EVM>
where
DB: Database + Clone,
EVM: ExecutorFactory,
{
/// Create a new blockchain tree.
pub fn new(
externals: TreeExternals<DB, EVM>,

View File

@ -18,10 +18,12 @@ use reth_primitives::{
BlockHash, BlockNumber, ForkBlock, GotExpected, SealedBlockWithSenders, SealedHeader, U256,
};
use reth_provider::{
providers::BundleStateProvider, BundleStateDataProvider, BundleStateWithReceipts, Chain,
ExecutorFactory, StateRootProvider,
providers::{BundleStateProvider, ConsistentDbView},
BundleStateDataProvider, BundleStateWithReceipts, Chain, ExecutorFactory, ProviderError,
StateRootProvider,
};
use reth_trie::updates::TrieUpdates;
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use std::{
collections::BTreeMap,
ops::{Deref, DerefMut},
@ -74,7 +76,7 @@ impl AppendableChain {
block_validation_kind: BlockValidationKind,
) -> Result<Self, InsertBlockErrorKind>
where
DB: Database,
DB: Database + Clone,
EF: ExecutorFactory,
{
let state = BundleStateWithReceipts::default();
@ -112,7 +114,7 @@ impl AppendableChain {
block_validation_kind: BlockValidationKind,
) -> Result<Self, InsertBlockErrorKind>
where
DB: Database,
DB: Database + Clone,
EF: ExecutorFactory,
{
let parent_number = block.number - 1;
@ -174,7 +176,7 @@ impl AppendableChain {
) -> RethResult<(BundleStateWithReceipts, Option<TrieUpdates>)>
where
BSDP: BundleStateDataProvider,
DB: Database,
DB: Database + Clone,
EVM: ExecutorFactory,
{
// some checks are done before blocks comes here.
@ -182,8 +184,18 @@ impl AppendableChain {
// get the state provider.
let canonical_fork = bundle_state_data_provider.canonical_fork();
// SAFETY: For block execution and parallel state root computation below we open multiple
// independent database transactions. Upon opening the database transaction the consistent
// view will check a current tip in the database and throw an error if it doesn't match
// the one recorded during initialization.
// It is safe to use consistent view without any special error handling as long as
// we guarantee that plain state cannot change during processing of new payload.
// The usage has to be re-evaluated if that was ever to change.
let consistent_view =
ConsistentDbView::new_with_latest_tip(externals.provider_factory.clone())?;
let state_provider =
externals.provider_factory.history_by_block_number(canonical_fork.number)?;
consistent_view.provider_ro()?.state_provider_by_block_number(canonical_fork.number)?;
let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider);
@ -199,9 +211,13 @@ impl AppendableChain {
// calculate and check state root
let start = Instant::now();
let (state_root, trie_updates) = if block_attachment.is_canonical() {
provider
.state_root_with_updates(bundle_state.state())
.map(|(root, updates)| (root, Some(updates)))?
let mut state = provider.bundle_state_data_provider.state().clone();
state.extend(bundle_state.clone());
let hashed_state = state.hash_state_slow();
ParallelStateRoot::new(consistent_view, hashed_state)
.incremental_root_with_updates()
.map(|(root, updates)| (root, Some(updates)))
.map_err(ProviderError::from)?
} else {
(provider.state_root(bundle_state.state())?, None)
};
@ -250,7 +266,7 @@ impl AppendableChain {
block_validation_kind: BlockValidationKind,
) -> Result<(), InsertBlockErrorKind>
where
DB: Database,
DB: Database + Clone,
EF: ExecutorFactory,
{
let parent_block = self.chain.tip();

View File

@ -27,19 +27,27 @@ use tracing::trace;
/// Shareable blockchain tree that is behind tokio::RwLock
#[derive(Clone, Debug)]
pub struct ShareableBlockchainTree<DB: Database, EF: ExecutorFactory> {
pub struct ShareableBlockchainTree<DB: Database + Clone, EF: ExecutorFactory> {
/// BlockchainTree
pub tree: Arc<RwLock<BlockchainTree<DB, EF>>>,
}
impl<DB: Database, EF: ExecutorFactory> ShareableBlockchainTree<DB, EF> {
impl<DB, EF> ShareableBlockchainTree<DB, EF>
where
DB: Database + Clone,
EF: ExecutorFactory,
{
/// Create a new shareable database.
pub fn new(tree: BlockchainTree<DB, EF>) -> Self {
Self { tree: Arc::new(RwLock::new(tree)) }
}
}
impl<DB: Database, EF: ExecutorFactory> BlockchainTreeEngine for ShareableBlockchainTree<DB, EF> {
impl<DB, EF> BlockchainTreeEngine for ShareableBlockchainTree<DB, EF>
where
DB: Database + Clone,
EF: ExecutorFactory,
{
fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<(), InsertBlockError> {
let mut tree = self.tree.write();
// Blockchain tree metrics shouldn't be updated here, see
@ -103,7 +111,11 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTreeEngine for ShareableBlockc
}
}
impl<DB: Database, EF: ExecutorFactory> BlockchainTreeViewer for ShareableBlockchainTree<DB, EF> {
impl<DB, EF> BlockchainTreeViewer for ShareableBlockchainTree<DB, EF>
where
DB: Database + Clone,
EF: ExecutorFactory,
{
fn blocks(&self) -> BTreeMap<BlockNumber, HashSet<BlockHash>> {
trace!(target: "blockchain_tree", "Returning all blocks in blockchain tree");
self.tree.read().block_indices().block_number_to_block_hashes().clone()
@ -196,8 +208,10 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTreeViewer for ShareableBlockc
}
}
impl<DB: Database, EF: ExecutorFactory> BlockchainTreePendingStateProvider
for ShareableBlockchainTree<DB, EF>
impl<DB, EF> BlockchainTreePendingStateProvider for ShareableBlockchainTree<DB, EF>
where
DB: Database + Clone,
EF: ExecutorFactory,
{
fn find_pending_state_provider(
&self,
@ -209,8 +223,10 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTreePendingStateProvider
}
}
impl<DB: Database, EF: ExecutorFactory> CanonStateSubscriptions
for ShareableBlockchainTree<DB, EF>
impl<DB, EF> CanonStateSubscriptions for ShareableBlockchainTree<DB, EF>
where
DB: Database + Clone,
EF: ExecutorFactory,
{
fn subscribe_to_canonical_state(&self) -> reth_provider::CanonStateNotifications {
trace!(target: "blockchain_tree", "Registered subscriber for canonical state");

View File

@ -131,6 +131,9 @@ pub enum ProviderError {
/// Error encountered when the block number conversion from U256 to u64 causes an overflow.
#[error("failed to convert block number U256 to u64: {0}")]
BlockNumberOverflow(U256),
/// Consistent view error.
#[error("failed to initialize consistent view: {0}")]
ConsistentView(Box<ConsistentViewError>),
}
impl From<reth_primitives::fs::FsPathError> for ProviderError {
@ -152,7 +155,7 @@ pub struct RootMismatch {
}
/// Consistent database view error.
#[derive(Error, Debug)]
#[derive(Clone, Debug, Error, PartialEq, Eq)]
pub enum ConsistentViewError {
/// Error thrown on attempt to initialize provider while node is still syncing.
#[error("node is syncing. best block: {0}")]
@ -163,7 +166,10 @@ pub enum ConsistentViewError {
/// The tip diff.
tip: GotExpected<Option<B256>>,
},
/// Underlying provider error.
#[error(transparent)]
Provider(#[from] ProviderError),
}
impl From<ConsistentViewError> for ProviderError {
fn from(value: ConsistentViewError) -> Self {
Self::ConsistentView(Box::new(value))
}
}

View File

@ -11,9 +11,9 @@ use revm::db::BundleState;
#[derive(Debug)]
pub struct BundleStateProvider<SP: StateProvider, BSDP: BundleStateDataProvider> {
/// The inner state provider.
pub(crate) state_provider: SP,
/// Bundle state data,
pub(crate) bundle_state_data_provider: BSDP,
pub state_provider: SP,
/// Bundle state data.
pub bundle_state_data_provider: BSDP,
}
impl<SP: StateProvider, BSDP: BundleStateDataProvider> BundleStateProvider<SP, BSDP> {

View File

@ -34,26 +34,22 @@ where
Provider: DatabaseProviderFactory<DB>,
{
/// Creates new consistent database view.
pub fn new(provider: Provider) -> Self {
Self { database: PhantomData, provider, tip: None }
pub fn new(provider: Provider, tip: Option<B256>) -> Self {
Self { database: PhantomData, provider, tip }
}
/// Initializes the view with provided tip.
pub fn with_tip(mut self, tip: B256) -> Self {
self.tip = Some(tip);
self
}
/// Initializes the view with latest tip.
pub fn with_latest_tip(mut self) -> ProviderResult<Self> {
let provider = self.provider.database_provider_ro()?;
let tip = provider.tx_ref().cursor_read::<tables::CanonicalHeaders>()?.last()?;
self.tip = tip.map(|(_, hash)| hash);
Ok(self)
/// Creates new consistent database view with latest tip.
pub fn new_with_latest_tip(provider: Provider) -> ProviderResult<Self> {
let tip = provider
.database_provider_ro()?
.tx_ref()
.cursor_read::<tables::CanonicalHeaders>()?
.last()?;
Ok(Self::new(provider, tip.map(|(_, hash)| hash)))
}
/// Creates new read-only provider and performs consistency checks on the current tip.
pub fn provider_ro(&self) -> Result<DatabaseProviderRO<DB>, ConsistentViewError> {
pub fn provider_ro(&self) -> ProviderResult<DatabaseProviderRO<DB>> {
let provider_ro = self.provider.database_provider_ro()?;
let last_entry = provider_ro
.tx_ref()
@ -65,12 +61,13 @@ where
if self.tip != tip {
return Err(ConsistentViewError::Inconsistent {
tip: GotExpected { got: tip, expected: self.tip },
})
}
.into())
}
let best_block_number = provider_ro.best_block_number()?;
if last_entry.map(|(number, _)| number).unwrap_or_default() != best_block_number {
return Err(ConsistentViewError::Syncing(best_block_number))
return Err(ConsistentViewError::Syncing(best_block_number).into())
}
Ok(provider_ro)

View File

@ -1,8 +1,5 @@
use crate::{
providers::{
state::{historical::HistoricalStateProvider, latest::LatestStateProvider},
StaticFileProvider,
},
providers::{state::latest::LatestStateProvider, StaticFileProvider},
to_range,
traits::{BlockSource, ReceiptProvider},
BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
@ -127,69 +124,19 @@ impl<DB: Database> ProviderFactory<DB> {
)))
}
/// Storage provider for latest block
/// State provider for latest block
#[track_caller]
pub fn latest(&self) -> ProviderResult<StateProviderBox> {
trace!(target: "providers::db", "Returning latest state provider");
Ok(Box::new(LatestStateProvider::new(self.db.tx()?, self.static_file_provider())))
}
/// Storage provider for state at that given block
fn state_provider_by_block_number(
&self,
provider: DatabaseProviderRO<DB>,
mut block_number: BlockNumber,
) -> ProviderResult<StateProviderBox> {
if block_number == provider.best_block_number().unwrap_or_default() &&
block_number == provider.last_block_number().unwrap_or_default()
{
return Ok(Box::new(LatestStateProvider::new(
provider.into_tx(),
self.static_file_provider(),
)))
}
// +1 as the changeset that we want is the one that was applied after this block.
block_number += 1;
let account_history_prune_checkpoint =
provider.get_prune_checkpoint(PruneSegment::AccountHistory)?;
let storage_history_prune_checkpoint =
provider.get_prune_checkpoint(PruneSegment::StorageHistory)?;
let mut state_provider = HistoricalStateProvider::new(
provider.into_tx(),
block_number,
self.static_file_provider(),
);
// If we pruned account or storage history, we can't return state on every historical block.
// Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
if let Some(prune_checkpoint_block_number) =
account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
state_provider = state_provider.with_lowest_available_account_history_block_number(
prune_checkpoint_block_number + 1,
);
}
if let Some(prune_checkpoint_block_number) =
storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
state_provider = state_provider.with_lowest_available_storage_history_block_number(
prune_checkpoint_block_number + 1,
);
}
Ok(Box::new(state_provider))
}
/// Storage provider for state at that given block
pub fn history_by_block_number(
&self,
block_number: BlockNumber,
) -> ProviderResult<StateProviderBox> {
let provider = self.provider()?;
let state_provider = self.state_provider_by_block_number(provider, block_number)?;
let state_provider = self.provider()?.state_provider_by_block_number(block_number)?;
trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number");
Ok(state_provider)
}
@ -202,8 +149,8 @@ impl<DB: Database> ProviderFactory<DB> {
.block_number(block_hash)?
.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
let state_provider = self.state_provider_by_block_number(provider, block_number)?;
trace!(target: "providers::db", ?block_number, "Returning historical state provider for block hash");
let state_provider = self.provider()?.state_provider_by_block_number(block_number)?;
trace!(target: "providers::db", ?block_number, %block_hash, "Returning historical state provider for block hash");
Ok(state_provider)
}
}

View File

@ -7,8 +7,9 @@ use crate::{
},
AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter,
Chain, EvmEnvProvider, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
HeaderSyncMode, HistoryWriter, OriginalValuesKnown, ProviderError, PruneCheckpointReader,
PruneCheckpointWriter, StageCheckpointReader, StatsReader, StorageReader, TransactionVariant,
HeaderSyncMode, HistoricalStateProvider, HistoryWriter, LatestStateProvider,
OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter,
StageCheckpointReader, StateProviderBox, StatsReader, StorageReader, TransactionVariant,
TransactionsProvider, TransactionsProviderExt, WithdrawalsProvider,
};
use itertools::{izip, Itertools};
@ -164,6 +165,50 @@ impl<TX: DbTx> DatabaseProvider<TX> {
}
}
impl<TX: DbTx + 'static> DatabaseProvider<TX> {
/// Storage provider for state at that given block
pub fn state_provider_by_block_number(
self,
mut block_number: BlockNumber,
) -> ProviderResult<StateProviderBox> {
if block_number == self.best_block_number().unwrap_or_default() &&
block_number == self.last_block_number().unwrap_or_default()
{
return Ok(Box::new(LatestStateProvider::new(self.tx, self.static_file_provider)))
}
// +1 as the changeset that we want is the one that was applied after this block.
block_number += 1;
let account_history_prune_checkpoint =
self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
let storage_history_prune_checkpoint =
self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
let mut state_provider =
HistoricalStateProvider::new(self.tx, block_number, self.static_file_provider);
// If we pruned account or storage history, we can't return state on every historical block.
// Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
if let Some(prune_checkpoint_block_number) =
account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
state_provider = state_provider.with_lowest_available_account_history_block_number(
prune_checkpoint_block_number + 1,
);
}
if let Some(prune_checkpoint_block_number) =
storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
state_provider = state_provider.with_lowest_available_storage_history_block_number(
prune_checkpoint_block_number + 1,
);
}
Ok(Box::new(state_provider))
}
}
impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
#[cfg(any(test, feature = "test-utils"))]
/// Inserts an historical block. Used for setting up test environments

View File

@ -36,9 +36,12 @@ pub use receipts::{ReceiptProvider, ReceiptProviderIdExt};
mod state;
pub use state::{
BlockchainTreePendingStateProvider, BundleStateDataProvider, StateProvider, StateProviderBox,
StateProviderFactory, StateRootProvider,
StateProviderFactory,
};
mod trie;
pub use trie::StateRootProvider;
mod transactions;
pub use transactions::{TransactionsProvider, TransactionsProviderExt};

View File

@ -1,13 +1,11 @@
use super::AccountReader;
use crate::{BlockHashReader, BlockIdReader, BundleStateWithReceipts};
use crate::{BlockHashReader, BlockIdReader, BundleStateWithReceipts, StateRootProvider};
use auto_impl::auto_impl;
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_primitives::{
trie::AccountProof, Address, BlockHash, BlockId, BlockNumHash, BlockNumber, BlockNumberOrTag,
Bytecode, StorageKey, StorageValue, B256, KECCAK_EMPTY, U256,
};
use reth_trie::updates::TrieUpdates;
use revm::db::BundleState;
/// Type alias of boxed [StateProvider].
pub type StateProviderBox = Box<dyn StateProvider>;
@ -216,7 +214,7 @@ pub trait BlockchainTreePendingStateProvider: Send + Sync {
/// * [`BundleStateWithReceipts`] contains all changed of accounts and storage of pending chain
/// * block hashes of pending chain and canonical blocks.
/// * canonical fork, the block on what pending chain was forked from.
#[auto_impl[Box,&]]
#[auto_impl(&, Box)]
pub trait BundleStateDataProvider: Send + Sync {
/// Return post state
fn state(&self) -> &BundleStateWithReceipts;
@ -227,21 +225,3 @@ pub trait BundleStateDataProvider: Send + Sync {
/// Needed to create state provider.
fn canonical_fork(&self) -> BlockNumHash;
}
/// A type that can compute the state root of a given post state.
#[auto_impl[Box,&, Arc]]
pub trait StateRootProvider: Send + Sync {
/// Returns the state root of the `BundleState` on top of the current state.
///
/// NOTE: It is recommended to provide a different implementation from
/// `state_root_with_updates` since it affects the memory usage during state root
/// computation.
fn state_root(&self, bundle_state: &BundleState) -> ProviderResult<B256>;
/// Returns the state root of the BundleState on top of the current state with trie
/// updates to be committed to the database.
fn state_root_with_updates(
&self,
bundle_state: &BundleState,
) -> ProviderResult<(B256, TrieUpdates)>;
}

View File

@ -0,0 +1,25 @@
use auto_impl::auto_impl;
use reth_interfaces::provider::ProviderResult;
use reth_primitives::B256;
use reth_trie::updates::TrieUpdates;
use revm::db::BundleState;
/// A type that can compute the state root of a given post state.
#[auto_impl(&, Box, Arc)]
pub trait StateRootProvider: Send + Sync {
/// Returns the state root of the `BundleState` on top of the current state.
///
/// # Note
///
/// It is recommended to provide a different implementation from
/// `state_root_with_updates` since it affects the memory usage during state root
/// computation.
fn state_root(&self, bundle_state: &BundleState) -> ProviderResult<B256>;
/// Returns the state root of the BundleState on top of the current state with trie
/// updates to be committed to the database.
fn state_root_with_updates(
&self,
bundle_state: &BundleState,
) -> ProviderResult<(B256, TrieUpdates)>;
}

View File

@ -55,7 +55,7 @@ criterion = { workspace = true, features = ["async_tokio"] }
proptest.workspace = true
[features]
default = ["metrics"]
default = ["metrics", "async", "parallel"]
metrics = ["reth-metrics", "dep:metrics", "reth-trie/metrics"]
async = ["reth-tasks/rayon", "tokio/sync", "itertools"]
parallel = ["rayon"]

View File

@ -33,7 +33,7 @@ pub fn calculate_state_root(c: &mut Criterion) {
provider_rw.commit().unwrap();
}
let view = ConsistentDbView::new(provider_factory.clone());
let view = ConsistentDbView::new(provider_factory.clone(), None);
// state root
group.bench_function(BenchmarkId::new("sync root", size), |b| {

View File

@ -7,10 +7,7 @@ use reth_primitives::{
trie::{HashBuilder, Nibbles, TrieAccount},
B256,
};
use reth_provider::{
providers::{ConsistentDbView, ConsistentViewError},
DatabaseProviderFactory, ProviderError,
};
use reth_provider::{providers::ConsistentDbView, DatabaseProviderFactory, ProviderError};
use reth_tasks::pool::BlockingTaskPool;
use reth_trie::{
hashed_cursor::HashedPostStateCursorFactory,
@ -219,9 +216,6 @@ pub enum AsyncStateRootError {
/// The hashed address for which channel was closed.
hashed_address: B256,
},
/// Consistency error on attempt to create new database provider.
#[error(transparent)]
ConsistentView(#[from] ConsistentViewError),
/// Error while calculating storage root.
#[error(transparent)]
StorageRoot(#[from] StorageRootError),
@ -244,7 +238,7 @@ mod tests {
let blocking_pool = BlockingTaskPool::new(ThreadPoolBuilder::default().build().unwrap());
let factory = create_test_provider_factory();
let consistent_view = ConsistentDbView::new(factory.clone());
let consistent_view = ConsistentDbView::new(factory.clone(), None);
let mut rng = rand::thread_rng();
let mut state = (0..100)

View File

@ -7,10 +7,7 @@ use reth_primitives::{
trie::{HashBuilder, Nibbles, TrieAccount},
B256,
};
use reth_provider::{
providers::{ConsistentDbView, ConsistentViewError},
DatabaseProviderFactory, ProviderError,
};
use reth_provider::{providers::ConsistentDbView, DatabaseProviderFactory, ProviderError};
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
node_iter::{AccountNode, AccountNodeIter},
@ -195,9 +192,6 @@ where
/// Error during parallel state root calculation.
#[derive(Error, Debug)]
pub enum ParallelStateRootError {
/// Consistency error on attempt to create new database provider.
#[error(transparent)]
ConsistentView(#[from] ConsistentViewError),
/// Error while calculating storage root.
#[error(transparent)]
StorageRoot(#[from] StorageRootError),
@ -206,6 +200,17 @@ pub enum ParallelStateRootError {
Provider(#[from] ProviderError),
}
impl From<ParallelStateRootError> for ProviderError {
fn from(error: ParallelStateRootError) -> Self {
match error {
ParallelStateRootError::Provider(error) => error,
ParallelStateRootError::StorageRoot(StorageRootError::DB(error)) => {
ProviderError::Database(error)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -217,7 +222,7 @@ mod tests {
#[tokio::test]
async fn random_parallel_root() {
let factory = create_test_provider_factory();
let consistent_view = ConsistentDbView::new(factory.clone());
let consistent_view = ConsistentDbView::new(factory.clone(), None);
let mut rng = rand::thread_rng();
let mut state = (0..100)