mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(engine): integrate sparse trie into the state root task (#12907)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -7267,7 +7267,6 @@ dependencies = [
|
|||||||
"reth-errors",
|
"reth-errors",
|
||||||
"reth-ethereum-engine-primitives",
|
"reth-ethereum-engine-primitives",
|
||||||
"reth-evm",
|
"reth-evm",
|
||||||
"reth-execution-errors",
|
|
||||||
"reth-exex-types",
|
"reth-exex-types",
|
||||||
"reth-metrics",
|
"reth-metrics",
|
||||||
"reth-network-p2p",
|
"reth-network-p2p",
|
||||||
|
|||||||
@ -21,7 +21,6 @@ reth-consensus.workspace = true
|
|||||||
reth-engine-primitives.workspace = true
|
reth-engine-primitives.workspace = true
|
||||||
reth-errors.workspace = true
|
reth-errors.workspace = true
|
||||||
reth-evm.workspace = true
|
reth-evm.workspace = true
|
||||||
reth-execution-errors.workspace = true
|
|
||||||
reth-network-p2p.workspace = true
|
reth-network-p2p.workspace = true
|
||||||
reth-payload-builder-primitives.workspace = true
|
reth-payload-builder-primitives.workspace = true
|
||||||
reth-payload-builder.workspace = true
|
reth-payload-builder.workspace = true
|
||||||
@ -95,10 +94,6 @@ rand.workspace = true
|
|||||||
name = "channel_perf"
|
name = "channel_perf"
|
||||||
harness = false
|
harness = false
|
||||||
|
|
||||||
[[bench]]
|
|
||||||
name = "state_root_from_proofs"
|
|
||||||
harness = false
|
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
test-utils = [
|
test-utils = [
|
||||||
"reth-blockchain-tree/test-utils",
|
"reth-blockchain-tree/test-utils",
|
||||||
|
|||||||
@ -1,81 +0,0 @@
|
|||||||
#![allow(missing_docs)]
|
|
||||||
|
|
||||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
|
||||||
use reth_engine_tree::tree::calculate_state_root_from_proofs;
|
|
||||||
use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
|
|
||||||
use reth_trie::{
|
|
||||||
updates::TrieUpdatesSorted, HashedPostState, HashedPostStateSorted, HashedStorage, MultiProof,
|
|
||||||
};
|
|
||||||
use revm_primitives::{
|
|
||||||
keccak256, Account, AccountInfo, AccountStatus, Address, EvmStorage, EvmStorageSlot, HashMap,
|
|
||||||
HashSet, B256, U256,
|
|
||||||
};
|
|
||||||
|
|
||||||
fn create_test_state(size: usize) -> (HashMap<B256, HashSet<B256>>, HashedPostState) {
|
|
||||||
let mut state = HashedPostState::default();
|
|
||||||
let mut targets = HashMap::default();
|
|
||||||
|
|
||||||
for i in 0..size {
|
|
||||||
let address = Address::random();
|
|
||||||
let hashed_address = keccak256(address);
|
|
||||||
|
|
||||||
// Create account
|
|
||||||
let info = AccountInfo {
|
|
||||||
balance: U256::from(100 + i),
|
|
||||||
nonce: i as u64,
|
|
||||||
code_hash: B256::random(),
|
|
||||||
code: Default::default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Create storage with multiple slots
|
|
||||||
let mut storage = EvmStorage::default();
|
|
||||||
let mut slots = HashSet::default();
|
|
||||||
for j in 0..100 {
|
|
||||||
let slot = U256::from(j);
|
|
||||||
let value = U256::from(100 + j);
|
|
||||||
storage.insert(slot, EvmStorageSlot::new(value));
|
|
||||||
slots.insert(keccak256(B256::from(slot)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let account = Account { info, storage: storage.clone(), status: AccountStatus::Loaded };
|
|
||||||
|
|
||||||
state.accounts.insert(hashed_address, Some(account.info.into()));
|
|
||||||
state.storages.insert(
|
|
||||||
hashed_address,
|
|
||||||
HashedStorage::from_iter(
|
|
||||||
false,
|
|
||||||
storage.into_iter().map(|(k, v)| (keccak256(B256::from(k)), v.present_value)),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
targets.insert(hashed_address, slots);
|
|
||||||
}
|
|
||||||
|
|
||||||
(targets, state)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn bench_state_root_collection(c: &mut Criterion) {
|
|
||||||
let factory = create_test_provider_factory();
|
|
||||||
let view = ConsistentDbView::new(factory, None);
|
|
||||||
|
|
||||||
let mut group = c.benchmark_group("state_root_collection");
|
|
||||||
for size in &[10, 100, 1000] {
|
|
||||||
let (_targets, state) = create_test_state(*size);
|
|
||||||
let multiproof = MultiProof::default();
|
|
||||||
|
|
||||||
group.bench_with_input(format!("size_{}", size), size, |b, _| {
|
|
||||||
b.iter(|| {
|
|
||||||
black_box(calculate_state_root_from_proofs(
|
|
||||||
view.clone(),
|
|
||||||
&TrieUpdatesSorted::default(),
|
|
||||||
&HashedPostStateSorted::default(),
|
|
||||||
multiproof.clone(),
|
|
||||||
state.clone(),
|
|
||||||
))
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
group.finish();
|
|
||||||
}
|
|
||||||
|
|
||||||
criterion_group!(benches, bench_state_root_collection);
|
|
||||||
criterion_main!(benches);
|
|
||||||
@ -80,7 +80,6 @@ pub use config::TreeConfig;
|
|||||||
pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
|
pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
|
||||||
pub use persistence_state::PersistenceState;
|
pub use persistence_state::PersistenceState;
|
||||||
pub use reth_engine_primitives::InvalidBlockHook;
|
pub use reth_engine_primitives::InvalidBlockHook;
|
||||||
pub use root::calculate_state_root_from_proofs;
|
|
||||||
|
|
||||||
mod root;
|
mod root;
|
||||||
|
|
||||||
|
|||||||
@ -1,23 +1,15 @@
|
|||||||
//! State root task related functionality.
|
//! State root task related functionality.
|
||||||
|
|
||||||
use alloy_primitives::map::{DefaultHashBuilder, FbHashMap, FbHashSet, HashMap, HashSet};
|
use alloy_primitives::map::{FbHashMap, HashMap, HashSet};
|
||||||
use alloy_rlp::{BufMut, Encodable};
|
use alloy_rlp::{BufMut, Encodable};
|
||||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
|
||||||
use reth_errors::ProviderResult;
|
|
||||||
use reth_execution_errors::TrieWitnessError;
|
|
||||||
use reth_provider::{
|
use reth_provider::{
|
||||||
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory,
|
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory,
|
||||||
};
|
};
|
||||||
use reth_trie::{
|
use reth_trie::{
|
||||||
hashed_cursor::HashedPostStateCursorFactory,
|
proof::Proof, updates::TrieUpdates, HashedPostState, HashedStorage, MultiProof, Nibbles,
|
||||||
proof::Proof,
|
TrieAccount, TrieInput, EMPTY_ROOT_HASH,
|
||||||
trie_cursor::InMemoryTrieCursorFactory,
|
|
||||||
updates::{TrieUpdates, TrieUpdatesSorted},
|
|
||||||
witness::{next_root_from_proofs, target_nodes},
|
|
||||||
HashedPostState, HashedPostStateSorted, HashedStorage, MultiProof, Nibbles, TrieAccount,
|
|
||||||
TrieInput, EMPTY_ROOT_HASH,
|
|
||||||
};
|
};
|
||||||
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseProof, DatabaseTrieCursorFactory};
|
use reth_trie_db::DatabaseProof;
|
||||||
use reth_trie_parallel::root::ParallelStateRootError;
|
use reth_trie_parallel::root::ParallelStateRootError;
|
||||||
use reth_trie_sparse::{SparseStateTrie, SparseStateTrieResult};
|
use reth_trie_sparse::{SparseStateTrie, SparseStateTrieResult};
|
||||||
use revm_primitives::{keccak256, EvmState, B256};
|
use revm_primitives::{keccak256, EvmState, B256};
|
||||||
@ -94,15 +86,15 @@ pub(crate) enum StateRootMessage {
|
|||||||
ProofCalculated {
|
ProofCalculated {
|
||||||
/// The calculated proof
|
/// The calculated proof
|
||||||
proof: MultiProof,
|
proof: MultiProof,
|
||||||
|
/// The state update that was used to calculate the proof
|
||||||
|
state_update: HashedPostState,
|
||||||
/// The index of this proof in the sequence of state updates
|
/// The index of this proof in the sequence of state updates
|
||||||
sequence_number: u64,
|
sequence_number: u64,
|
||||||
},
|
},
|
||||||
/// State root calculation completed
|
/// State root calculation completed
|
||||||
RootCalculated {
|
RootCalculated {
|
||||||
/// The calculated state root
|
/// The updated sparse trie
|
||||||
root: B256,
|
trie: Box<SparseStateTrie>,
|
||||||
/// The trie updates produced during calculation
|
|
||||||
updates: TrieUpdates,
|
|
||||||
/// Time taken to calculate the root
|
/// Time taken to calculate the root
|
||||||
elapsed: Duration,
|
elapsed: Duration,
|
||||||
},
|
},
|
||||||
@ -115,8 +107,8 @@ pub(crate) struct ProofSequencer {
|
|||||||
next_sequence: u64,
|
next_sequence: u64,
|
||||||
/// The next sequence number expected to be delivered.
|
/// The next sequence number expected to be delivered.
|
||||||
next_to_deliver: u64,
|
next_to_deliver: u64,
|
||||||
/// Buffer for out-of-order proofs
|
/// Buffer for out-of-order proofs and corresponding state updates
|
||||||
pending_proofs: BTreeMap<u64, MultiProof>,
|
pending_proofs: BTreeMap<u64, (MultiProof, HashedPostState)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ProofSequencer {
|
impl ProofSequencer {
|
||||||
@ -132,10 +124,16 @@ impl ProofSequencer {
|
|||||||
seq
|
seq
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adds a proof and returns all sequential proofs if we have a continuous sequence
|
/// Adds a proof with the corresponding state update and returns all sequential proofs and state
|
||||||
pub(crate) fn add_proof(&mut self, sequence: u64, proof: MultiProof) -> Vec<MultiProof> {
|
/// updates if we have a continuous sequence
|
||||||
|
pub(crate) fn add_proof(
|
||||||
|
&mut self,
|
||||||
|
sequence: u64,
|
||||||
|
proof: MultiProof,
|
||||||
|
state_update: HashedPostState,
|
||||||
|
) -> Vec<(MultiProof, HashedPostState)> {
|
||||||
if sequence >= self.next_to_deliver {
|
if sequence >= self.next_to_deliver {
|
||||||
self.pending_proofs.insert(sequence, proof);
|
self.pending_proofs.insert(sequence, (proof, state_update));
|
||||||
}
|
}
|
||||||
|
|
||||||
// return early if we don't have the next expected proof
|
// return early if we don't have the next expected proof
|
||||||
@ -146,9 +144,9 @@ impl ProofSequencer {
|
|||||||
let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
|
let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
|
||||||
let mut current_sequence = self.next_to_deliver;
|
let mut current_sequence = self.next_to_deliver;
|
||||||
|
|
||||||
// keep collecting proofs as long as we have consecutive sequence numbers
|
// keep collecting proofs and state updates as long as we have consecutive sequence numbers
|
||||||
while let Some(proof) = self.pending_proofs.remove(¤t_sequence) {
|
while let Some((proof, state_update)) = self.pending_proofs.remove(¤t_sequence) {
|
||||||
consecutive_proofs.push(proof);
|
consecutive_proofs.push((proof, state_update));
|
||||||
current_sequence += 1;
|
current_sequence += 1;
|
||||||
|
|
||||||
// if we don't have the next number, stop collecting
|
// if we don't have the next number, stop collecting
|
||||||
@ -180,18 +178,19 @@ impl ProofSequencer {
|
|||||||
/// Then it updates relevant leaves according to the result of the transaction.
|
/// Then it updates relevant leaves according to the result of the transaction.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct StateRootTask<Factory> {
|
pub(crate) struct StateRootTask<Factory> {
|
||||||
|
/// Task configuration
|
||||||
|
config: StateRootConfig<Factory>,
|
||||||
/// Receiver for state root related messages
|
/// Receiver for state root related messages
|
||||||
rx: Receiver<StateRootMessage>,
|
rx: Receiver<StateRootMessage>,
|
||||||
/// Sender for state root related messages
|
/// Sender for state root related messages
|
||||||
tx: Sender<StateRootMessage>,
|
tx: Sender<StateRootMessage>,
|
||||||
/// Task configuration
|
/// Proof targets that have been already fetched
|
||||||
config: StateRootConfig<Factory>,
|
fetched_proof_targets: HashSet<B256>,
|
||||||
/// Current state
|
|
||||||
state: HashedPostState,
|
|
||||||
/// Proof sequencing handler
|
/// Proof sequencing handler
|
||||||
proof_sequencer: ProofSequencer,
|
proof_sequencer: ProofSequencer,
|
||||||
/// Whether we're currently calculating a root
|
/// The sparse trie used for the state root calculation. If [`None`], then update is in
|
||||||
calculating_root: bool,
|
/// progress.
|
||||||
|
sparse_trie: Option<Box<SparseStateTrie>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
@ -209,9 +208,9 @@ where
|
|||||||
config,
|
config,
|
||||||
rx,
|
rx,
|
||||||
tx,
|
tx,
|
||||||
state: Default::default(),
|
fetched_proof_targets: Default::default(),
|
||||||
proof_sequencer: ProofSequencer::new(),
|
proof_sequencer: ProofSequencer::new(),
|
||||||
calculating_root: false,
|
sparse_trie: Some(Box::new(SparseStateTrie::default().with_updates(true))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -231,14 +230,16 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Handles state updates.
|
/// Handles state updates.
|
||||||
|
///
|
||||||
|
/// Returns proof targets derived from the state update.
|
||||||
fn on_state_update(
|
fn on_state_update(
|
||||||
view: ConsistentDbView<Factory>,
|
view: ConsistentDbView<Factory>,
|
||||||
input: Arc<TrieInput>,
|
input: Arc<TrieInput>,
|
||||||
update: EvmState,
|
update: EvmState,
|
||||||
state: &mut HashedPostState,
|
fetched_proof_targets: &HashSet<B256>,
|
||||||
proof_sequence_number: u64,
|
proof_sequence_number: u64,
|
||||||
state_root_message_sender: Sender<StateRootMessage>,
|
state_root_message_sender: Sender<StateRootMessage>,
|
||||||
) {
|
) -> HashMap<B256, HashSet<B256>> {
|
||||||
let mut hashed_state_update = HashedPostState::default();
|
let mut hashed_state_update = HashedPostState::default();
|
||||||
for (address, account) in update {
|
for (address, account) in update {
|
||||||
if account.is_touched() {
|
if account.is_touched() {
|
||||||
@ -263,20 +264,10 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dispatch proof gathering for this state update
|
let proof_targets = get_proof_targets(&hashed_state_update, fetched_proof_targets);
|
||||||
let targets = hashed_state_update
|
|
||||||
.accounts
|
|
||||||
.keys()
|
|
||||||
.filter(|hashed_address| {
|
|
||||||
!state.accounts.contains_key(*hashed_address) &&
|
|
||||||
!state.storages.contains_key(*hashed_address)
|
|
||||||
})
|
|
||||||
.map(|hashed_address| (*hashed_address, HashSet::default()))
|
|
||||||
.chain(hashed_state_update.storages.iter().map(|(hashed_address, storage)| {
|
|
||||||
(*hashed_address, storage.storage.keys().copied().collect())
|
|
||||||
}))
|
|
||||||
.collect::<HashMap<_, _>>();
|
|
||||||
|
|
||||||
|
// Dispatch proof gathering for this state update
|
||||||
|
let targets = proof_targets.clone();
|
||||||
rayon::spawn(move || {
|
rayon::spawn(move || {
|
||||||
let provider = match view.provider_ro() {
|
let provider = match view.provider_ro() {
|
||||||
Ok(provider) => provider,
|
Ok(provider) => provider,
|
||||||
@ -287,12 +278,17 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
// TODO: replace with parallel proof
|
// TODO: replace with parallel proof
|
||||||
let result =
|
let result = Proof::overlay_multiproof(
|
||||||
Proof::overlay_multiproof(provider.tx_ref(), input.as_ref().clone(), targets);
|
provider.tx_ref(),
|
||||||
|
// TODO(alexey): this clone can be expensive, we should avoid it
|
||||||
|
input.as_ref().clone(),
|
||||||
|
targets,
|
||||||
|
);
|
||||||
match result {
|
match result {
|
||||||
Ok(proof) => {
|
Ok(proof) => {
|
||||||
let _ = state_root_message_sender.send(StateRootMessage::ProofCalculated {
|
let _ = state_root_message_sender.send(StateRootMessage::ProofCalculated {
|
||||||
proof,
|
proof,
|
||||||
|
state_update: hashed_state_update,
|
||||||
sequence_number: proof_sequence_number,
|
sequence_number: proof_sequence_number,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -302,30 +298,33 @@ where
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
state.extend(hashed_state_update);
|
proof_targets
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handler for new proof calculated, aggregates all the existing sequential proofs.
|
/// Handler for new proof calculated, aggregates all the existing sequential proofs.
|
||||||
fn on_proof(&mut self, proof: MultiProof, sequence_number: u64) -> Option<MultiProof> {
|
fn on_proof(
|
||||||
let ready_proofs = self.proof_sequencer.add_proof(sequence_number, proof);
|
&mut self,
|
||||||
|
sequence_number: u64,
|
||||||
|
proof: MultiProof,
|
||||||
|
state_update: HashedPostState,
|
||||||
|
) -> Option<(MultiProof, HashedPostState)> {
|
||||||
|
let ready_proofs = self.proof_sequencer.add_proof(sequence_number, proof, state_update);
|
||||||
|
|
||||||
if ready_proofs.is_empty() {
|
if ready_proofs.is_empty() {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
// combine all ready proofs into one
|
// Merge all ready proofs and state updates
|
||||||
ready_proofs.into_iter().reduce(|mut acc, proof| {
|
ready_proofs.into_iter().reduce(|mut acc, (proof, state_update)| {
|
||||||
acc.extend(proof);
|
acc.0.extend(proof);
|
||||||
|
acc.1.extend(state_update);
|
||||||
acc
|
acc
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawns root calculation with the current state and proofs
|
/// Spawns root calculation with the current state and proofs.
|
||||||
fn spawn_root_calculation(&mut self, multiproof: MultiProof) {
|
fn spawn_root_calculation(&mut self, state: HashedPostState, multiproof: MultiProof) {
|
||||||
if self.calculating_root {
|
let Some(trie) = self.sparse_trie.take() else { return };
|
||||||
return;
|
|
||||||
}
|
|
||||||
self.calculating_root = true;
|
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
target: "engine::root",
|
target: "engine::root",
|
||||||
@ -334,28 +333,20 @@ where
|
|||||||
"Spawning root calculation"
|
"Spawning root calculation"
|
||||||
);
|
);
|
||||||
|
|
||||||
let tx = self.tx.clone();
|
// TODO(alexey): store proof targets in `ProofSequecner` to avoid recomputing them
|
||||||
let view = self.config.consistent_view.clone();
|
let targets = get_proof_targets(&state, &HashSet::default());
|
||||||
let input = self.config.input.clone();
|
|
||||||
let state = self.state.clone();
|
|
||||||
|
|
||||||
|
let tx = self.tx.clone();
|
||||||
rayon::spawn(move || {
|
rayon::spawn(move || {
|
||||||
let result = calculate_state_root_from_proofs(
|
let result = update_sparse_trie(trie, multiproof, targets, state);
|
||||||
view,
|
|
||||||
&input.nodes.clone().into_sorted(),
|
|
||||||
&input.state.clone().into_sorted(),
|
|
||||||
multiproof,
|
|
||||||
state,
|
|
||||||
);
|
|
||||||
match result {
|
match result {
|
||||||
Ok((root, updates, elapsed)) => {
|
Ok((trie, elapsed)) => {
|
||||||
trace!(
|
trace!(
|
||||||
target: "engine::root",
|
target: "engine::root",
|
||||||
%root,
|
|
||||||
?elapsed,
|
?elapsed,
|
||||||
"Root calculation completed, sending result"
|
"Root calculation completed, sending result"
|
||||||
);
|
);
|
||||||
let _ = tx.send(StateRootMessage::RootCalculated { root, updates, elapsed });
|
let _ = tx.send(StateRootMessage::RootCalculated { trie, elapsed });
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(target: "engine::root", error = ?e, "Could not calculate state root");
|
error!(target: "engine::root", error = ?e, "Could not calculate state root");
|
||||||
@ -365,9 +356,8 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn run(mut self) -> StateRootResult {
|
fn run(mut self) -> StateRootResult {
|
||||||
|
let mut current_state_update = HashedPostState::default();
|
||||||
let mut current_multiproof = MultiProof::default();
|
let mut current_multiproof = MultiProof::default();
|
||||||
let mut trie_updates = TrieUpdates::default();
|
|
||||||
let mut current_root: B256;
|
|
||||||
let mut updates_received = 0;
|
let mut updates_received = 0;
|
||||||
let mut proofs_processed = 0;
|
let mut proofs_processed = 0;
|
||||||
let mut roots_calculated = 0;
|
let mut roots_calculated = 0;
|
||||||
@ -383,16 +373,18 @@ where
|
|||||||
total_updates = updates_received,
|
total_updates = updates_received,
|
||||||
"Received new state update"
|
"Received new state update"
|
||||||
);
|
);
|
||||||
Self::on_state_update(
|
let targets = Self::on_state_update(
|
||||||
self.config.consistent_view.clone(),
|
self.config.consistent_view.clone(),
|
||||||
self.config.input.clone(),
|
self.config.input.clone(),
|
||||||
update,
|
update,
|
||||||
&mut self.state,
|
&self.fetched_proof_targets,
|
||||||
self.proof_sequencer.next_sequence(),
|
self.proof_sequencer.next_sequence(),
|
||||||
self.tx.clone(),
|
self.tx.clone(),
|
||||||
);
|
);
|
||||||
|
self.fetched_proof_targets.extend(targets.keys());
|
||||||
|
self.fetched_proof_targets.extend(targets.values().flatten());
|
||||||
}
|
}
|
||||||
StateRootMessage::ProofCalculated { proof, sequence_number } => {
|
StateRootMessage::ProofCalculated { proof, state_update, sequence_number } => {
|
||||||
proofs_processed += 1;
|
proofs_processed += 1;
|
||||||
trace!(
|
trace!(
|
||||||
target: "engine::root",
|
target: "engine::root",
|
||||||
@ -401,28 +393,28 @@ where
|
|||||||
"Processing calculated proof"
|
"Processing calculated proof"
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some(combined_proof) = self.on_proof(proof, sequence_number) {
|
if let Some((combined_proof, combined_state_update)) =
|
||||||
if self.calculating_root {
|
self.on_proof(sequence_number, proof, state_update)
|
||||||
|
{
|
||||||
|
if self.sparse_trie.is_none() {
|
||||||
current_multiproof.extend(combined_proof);
|
current_multiproof.extend(combined_proof);
|
||||||
|
current_state_update.extend(combined_state_update);
|
||||||
} else {
|
} else {
|
||||||
self.spawn_root_calculation(combined_proof);
|
self.spawn_root_calculation(combined_state_update, combined_proof);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
StateRootMessage::RootCalculated { root, updates, elapsed } => {
|
StateRootMessage::RootCalculated { trie, elapsed } => {
|
||||||
roots_calculated += 1;
|
roots_calculated += 1;
|
||||||
trace!(
|
trace!(
|
||||||
target: "engine::root",
|
target: "engine::root",
|
||||||
%root,
|
|
||||||
?elapsed,
|
?elapsed,
|
||||||
roots_calculated,
|
roots_calculated,
|
||||||
proofs = proofs_processed,
|
proofs = proofs_processed,
|
||||||
updates = updates_received,
|
updates = updates_received,
|
||||||
"Computed intermediate root"
|
"Computed intermediate root"
|
||||||
);
|
);
|
||||||
current_root = root;
|
self.sparse_trie = Some(trie);
|
||||||
trie_updates.extend(updates);
|
|
||||||
self.calculating_root = false;
|
|
||||||
|
|
||||||
let has_new_proofs = !current_multiproof.account_subtree.is_empty() ||
|
let has_new_proofs = !current_multiproof.account_subtree.is_empty() ||
|
||||||
!current_multiproof.storages.is_empty();
|
!current_multiproof.storages.is_empty();
|
||||||
@ -445,7 +437,10 @@ where
|
|||||||
storage_proofs = current_multiproof.storages.len(),
|
storage_proofs = current_multiproof.storages.len(),
|
||||||
"Spawning subsequent root calculation"
|
"Spawning subsequent root calculation"
|
||||||
);
|
);
|
||||||
self.spawn_root_calculation(std::mem::take(&mut current_multiproof));
|
self.spawn_root_calculation(
|
||||||
|
std::mem::take(&mut current_state_update),
|
||||||
|
std::mem::take(&mut current_multiproof),
|
||||||
|
);
|
||||||
} else if all_proofs_received && no_pending {
|
} else if all_proofs_received && no_pending {
|
||||||
debug!(
|
debug!(
|
||||||
target: "engine::root",
|
target: "engine::root",
|
||||||
@ -454,7 +449,15 @@ where
|
|||||||
roots_calculated,
|
roots_calculated,
|
||||||
"All proofs processed, ending calculation"
|
"All proofs processed, ending calculation"
|
||||||
);
|
);
|
||||||
return Ok((current_root, trie_updates));
|
let mut trie = self
|
||||||
|
.sparse_trie
|
||||||
|
.take()
|
||||||
|
.expect("sparse trie update should not be in progress");
|
||||||
|
let root = trie.root().expect("sparse trie should be revealed");
|
||||||
|
let trie_updates = trie
|
||||||
|
.take_trie_updates()
|
||||||
|
.expect("sparse trie should have updates retention enabled");
|
||||||
|
return Ok((root, trie_updates));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -474,156 +477,27 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Calculate state root from proofs.
|
fn get_proof_targets(
|
||||||
pub fn calculate_state_root_from_proofs<Factory>(
|
state_update: &HashedPostState,
|
||||||
view: ConsistentDbView<Factory>,
|
fetched_proof_targets: &HashSet<B256>,
|
||||||
input_nodes_sorted: &TrieUpdatesSorted,
|
) -> HashMap<B256, HashSet<B256>> {
|
||||||
input_state_sorted: &HashedPostStateSorted,
|
state_update
|
||||||
multiproof: MultiProof,
|
|
||||||
state: HashedPostState,
|
|
||||||
) -> ProviderResult<(B256, TrieUpdates, Duration)>
|
|
||||||
where
|
|
||||||
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone,
|
|
||||||
{
|
|
||||||
let started_at = Instant::now();
|
|
||||||
|
|
||||||
let proof_targets: HashMap<B256, HashSet<B256>> = state
|
|
||||||
.accounts
|
.accounts
|
||||||
.keys()
|
.keys()
|
||||||
|
.filter(|hashed_address| !fetched_proof_targets.contains(*hashed_address))
|
||||||
.map(|hashed_address| (*hashed_address, HashSet::default()))
|
.map(|hashed_address| (*hashed_address, HashSet::default()))
|
||||||
.chain(state.storages.iter().map(|(hashed_address, storage)| {
|
.chain(state_update.storages.iter().map(|(hashed_address, storage)| {
|
||||||
(*hashed_address, storage.storage.keys().copied().collect())
|
(*hashed_address, storage.storage.keys().copied().collect())
|
||||||
}))
|
}))
|
||||||
.collect();
|
.collect()
|
||||||
|
|
||||||
let account_trie_nodes = proof_targets
|
|
||||||
.into_par_iter()
|
|
||||||
.map_init(
|
|
||||||
|| view.provider_ro().unwrap(),
|
|
||||||
|provider_ro, (hashed_address, hashed_slots)| {
|
|
||||||
// Gather and record storage trie nodes for this account.
|
|
||||||
let mut storage_trie_nodes = BTreeMap::default();
|
|
||||||
let storage = state.storages.get(&hashed_address);
|
|
||||||
for hashed_slot in hashed_slots {
|
|
||||||
let slot_key = Nibbles::unpack(hashed_slot);
|
|
||||||
let slot_value = storage
|
|
||||||
.and_then(|s| s.storage.get(&hashed_slot))
|
|
||||||
.filter(|v| !v.is_zero())
|
|
||||||
.map(|v| alloy_rlp::encode_fixed_size(v).to_vec());
|
|
||||||
let proof = multiproof
|
|
||||||
.storages
|
|
||||||
.get(&hashed_address)
|
|
||||||
.map(|proof| {
|
|
||||||
proof
|
|
||||||
.subtree
|
|
||||||
.iter()
|
|
||||||
.filter(|e| slot_key.starts_with(e.0))
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
})
|
|
||||||
.unwrap_or_default();
|
|
||||||
storage_trie_nodes.extend(target_nodes(
|
|
||||||
slot_key.clone(),
|
|
||||||
slot_value,
|
|
||||||
None,
|
|
||||||
proof,
|
|
||||||
)?);
|
|
||||||
}
|
|
||||||
|
|
||||||
let storage_root = next_root_from_proofs(storage_trie_nodes, |key: Nibbles| {
|
|
||||||
// Right pad the target with 0s.
|
|
||||||
let mut padded_key = key.pack();
|
|
||||||
padded_key.resize(32, 0);
|
|
||||||
let mut targets = HashMap::with_hasher(DefaultHashBuilder::default());
|
|
||||||
let mut slots = HashSet::with_hasher(DefaultHashBuilder::default());
|
|
||||||
slots.insert(B256::from_slice(&padded_key));
|
|
||||||
targets.insert(hashed_address, slots);
|
|
||||||
let proof = Proof::new(
|
|
||||||
InMemoryTrieCursorFactory::new(
|
|
||||||
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
|
|
||||||
input_nodes_sorted,
|
|
||||||
),
|
|
||||||
HashedPostStateCursorFactory::new(
|
|
||||||
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
|
|
||||||
input_state_sorted,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.multiproof(targets)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// The subtree only contains the proof for a single target.
|
|
||||||
let node = proof
|
|
||||||
.storages
|
|
||||||
.get(&hashed_address)
|
|
||||||
.and_then(|storage_multiproof| storage_multiproof.subtree.get(&key))
|
|
||||||
.cloned()
|
|
||||||
.ok_or(TrieWitnessError::MissingTargetNode(key))?;
|
|
||||||
Ok(node)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// Gather and record account trie nodes.
|
|
||||||
let account = state
|
|
||||||
.accounts
|
|
||||||
.get(&hashed_address)
|
|
||||||
.ok_or(TrieWitnessError::MissingAccount(hashed_address))?;
|
|
||||||
let value = (account.is_some() || storage_root != EMPTY_ROOT_HASH).then(|| {
|
|
||||||
let mut encoded = Vec::with_capacity(128);
|
|
||||||
TrieAccount::from((account.unwrap_or_default(), storage_root))
|
|
||||||
.encode(&mut encoded as &mut dyn BufMut);
|
|
||||||
encoded
|
|
||||||
});
|
|
||||||
let key = Nibbles::unpack(hashed_address);
|
|
||||||
let proof = multiproof.account_subtree.iter().filter(|e| key.starts_with(e.0));
|
|
||||||
target_nodes(key.clone(), value, None, proof)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.try_reduce(BTreeMap::new, |mut acc, map| {
|
|
||||||
acc.extend(map.into_iter());
|
|
||||||
Ok(acc)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let provider_ro = view.provider_ro()?;
|
|
||||||
|
|
||||||
let state_root = next_root_from_proofs(account_trie_nodes, |key: Nibbles| {
|
|
||||||
// Right pad the target with 0s.
|
|
||||||
let mut padded_key = key.pack();
|
|
||||||
padded_key.resize(32, 0);
|
|
||||||
let mut targets = HashMap::with_hasher(DefaultHashBuilder::default());
|
|
||||||
targets.insert(
|
|
||||||
B256::from_slice(&padded_key),
|
|
||||||
HashSet::with_hasher(DefaultHashBuilder::default()),
|
|
||||||
);
|
|
||||||
let proof = Proof::new(
|
|
||||||
InMemoryTrieCursorFactory::new(
|
|
||||||
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
|
|
||||||
input_nodes_sorted,
|
|
||||||
),
|
|
||||||
HashedPostStateCursorFactory::new(
|
|
||||||
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
|
|
||||||
input_state_sorted,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.multiproof(targets)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// The subtree only contains the proof for a single target.
|
|
||||||
let node = proof
|
|
||||||
.account_subtree
|
|
||||||
.get(&key)
|
|
||||||
.cloned()
|
|
||||||
.ok_or(TrieWitnessError::MissingTargetNode(key))?;
|
|
||||||
Ok(node)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok((state_root, Default::default(), started_at.elapsed()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Updates the sparse trie with the given proofs and state, and returns the updated trie and the
|
/// Updates the sparse trie with the given proofs and state, and returns the updated trie and the
|
||||||
/// time it took.
|
/// time it took.
|
||||||
#[allow(dead_code)]
|
|
||||||
fn update_sparse_trie(
|
fn update_sparse_trie(
|
||||||
mut trie: Box<SparseStateTrie>,
|
mut trie: Box<SparseStateTrie>,
|
||||||
multiproof: MultiProof,
|
multiproof: MultiProof,
|
||||||
targets: FbHashMap<32, FbHashSet<32>>,
|
targets: HashMap<B256, HashSet<B256>>,
|
||||||
state: HashedPostState,
|
state: HashedPostState,
|
||||||
) -> SparseStateTrieResult<(Box<SparseStateTrie>, Duration)> {
|
) -> SparseStateTrieResult<(Box<SparseStateTrie>, Duration)> {
|
||||||
let started_at = Instant::now();
|
let started_at = Instant::now();
|
||||||
@ -860,11 +734,11 @@ mod tests {
|
|||||||
let proof2 = MultiProof::default();
|
let proof2 = MultiProof::default();
|
||||||
sequencer.next_sequence = 2;
|
sequencer.next_sequence = 2;
|
||||||
|
|
||||||
let ready = sequencer.add_proof(0, proof1);
|
let ready = sequencer.add_proof(0, proof1, HashedPostState::default());
|
||||||
assert_eq!(ready.len(), 1);
|
assert_eq!(ready.len(), 1);
|
||||||
assert!(!sequencer.has_pending());
|
assert!(!sequencer.has_pending());
|
||||||
|
|
||||||
let ready = sequencer.add_proof(1, proof2);
|
let ready = sequencer.add_proof(1, proof2, HashedPostState::default());
|
||||||
assert_eq!(ready.len(), 1);
|
assert_eq!(ready.len(), 1);
|
||||||
assert!(!sequencer.has_pending());
|
assert!(!sequencer.has_pending());
|
||||||
}
|
}
|
||||||
@ -877,15 +751,15 @@ mod tests {
|
|||||||
let proof3 = MultiProof::default();
|
let proof3 = MultiProof::default();
|
||||||
sequencer.next_sequence = 3;
|
sequencer.next_sequence = 3;
|
||||||
|
|
||||||
let ready = sequencer.add_proof(2, proof3);
|
let ready = sequencer.add_proof(2, proof3, HashedPostState::default());
|
||||||
assert_eq!(ready.len(), 0);
|
assert_eq!(ready.len(), 0);
|
||||||
assert!(sequencer.has_pending());
|
assert!(sequencer.has_pending());
|
||||||
|
|
||||||
let ready = sequencer.add_proof(0, proof1);
|
let ready = sequencer.add_proof(0, proof1, HashedPostState::default());
|
||||||
assert_eq!(ready.len(), 1);
|
assert_eq!(ready.len(), 1);
|
||||||
assert!(sequencer.has_pending());
|
assert!(sequencer.has_pending());
|
||||||
|
|
||||||
let ready = sequencer.add_proof(1, proof2);
|
let ready = sequencer.add_proof(1, proof2, HashedPostState::default());
|
||||||
assert_eq!(ready.len(), 2);
|
assert_eq!(ready.len(), 2);
|
||||||
assert!(!sequencer.has_pending());
|
assert!(!sequencer.has_pending());
|
||||||
}
|
}
|
||||||
@ -897,10 +771,10 @@ mod tests {
|
|||||||
let proof3 = MultiProof::default();
|
let proof3 = MultiProof::default();
|
||||||
sequencer.next_sequence = 3;
|
sequencer.next_sequence = 3;
|
||||||
|
|
||||||
let ready = sequencer.add_proof(0, proof1);
|
let ready = sequencer.add_proof(0, proof1, HashedPostState::default());
|
||||||
assert_eq!(ready.len(), 1);
|
assert_eq!(ready.len(), 1);
|
||||||
|
|
||||||
let ready = sequencer.add_proof(2, proof3);
|
let ready = sequencer.add_proof(2, proof3, HashedPostState::default());
|
||||||
assert_eq!(ready.len(), 0);
|
assert_eq!(ready.len(), 0);
|
||||||
assert!(sequencer.has_pending());
|
assert!(sequencer.has_pending());
|
||||||
}
|
}
|
||||||
@ -911,10 +785,10 @@ mod tests {
|
|||||||
let proof1 = MultiProof::default();
|
let proof1 = MultiProof::default();
|
||||||
let proof2 = MultiProof::default();
|
let proof2 = MultiProof::default();
|
||||||
|
|
||||||
let ready = sequencer.add_proof(0, proof1);
|
let ready = sequencer.add_proof(0, proof1, HashedPostState::default());
|
||||||
assert_eq!(ready.len(), 1);
|
assert_eq!(ready.len(), 1);
|
||||||
|
|
||||||
let ready = sequencer.add_proof(0, proof2);
|
let ready = sequencer.add_proof(0, proof2, HashedPostState::default());
|
||||||
assert_eq!(ready.len(), 0);
|
assert_eq!(ready.len(), 0);
|
||||||
assert!(!sequencer.has_pending());
|
assert!(!sequencer.has_pending());
|
||||||
}
|
}
|
||||||
@ -925,12 +799,12 @@ mod tests {
|
|||||||
let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
|
let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
|
||||||
sequencer.next_sequence = 5;
|
sequencer.next_sequence = 5;
|
||||||
|
|
||||||
sequencer.add_proof(4, proofs[4].clone());
|
sequencer.add_proof(4, proofs[4].clone(), HashedPostState::default());
|
||||||
sequencer.add_proof(2, proofs[2].clone());
|
sequencer.add_proof(2, proofs[2].clone(), HashedPostState::default());
|
||||||
sequencer.add_proof(1, proofs[1].clone());
|
sequencer.add_proof(1, proofs[1].clone(), HashedPostState::default());
|
||||||
sequencer.add_proof(3, proofs[3].clone());
|
sequencer.add_proof(3, proofs[3].clone(), HashedPostState::default());
|
||||||
|
|
||||||
let ready = sequencer.add_proof(0, proofs[0].clone());
|
let ready = sequencer.add_proof(0, proofs[0].clone(), HashedPostState::default());
|
||||||
assert_eq!(ready.len(), 5);
|
assert_eq!(ready.len(), 5);
|
||||||
assert!(!sequencer.has_pending());
|
assert!(!sequencer.has_pending());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user