perf(trie): deduplicate already fetched prefetch targets (#14223)

Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
This commit is contained in:
Dan Cline
2025-02-05 17:30:49 -05:00
committed by GitHub
parent 2c3faf9b2b
commit 06132f509c

View File

@ -555,17 +555,68 @@ where
/// Handles request for proof prefetch.
fn on_prefetch_proof(&mut self, targets: MultiProofTargets) {
extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &targets);
let proof_targets = self.get_prefetch_proof_targets(targets);
extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &proof_targets);
self.multiproof_manager.spawn_or_queue(MultiproofInput {
config: self.config.clone(),
hashed_state_update: Default::default(),
proof_targets: targets,
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
});
}
/// Calls `get_proof_targets` with existing proof targets for prefetching.
fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
// Here we want to filter out any targets that are already fetched
//
// This means we need to remove any storage slots that have already been fetched
let mut duplicates = 0;
// First remove all storage targets that are subsets of already fetched storage slots
targets.retain(|hashed_address, target_storage| {
let keep = self
.fetched_proof_targets
.get(hashed_address)
// do NOT remove if None, because that means the account has not been fetched yet
.is_none_or(|fetched_storage| {
// remove if a subset
!target_storage.is_subset(fetched_storage)
});
if !keep {
duplicates += target_storage.len();
}
keep
});
// For all non-subset remaining targets, we have to calculate the difference
for (hashed_address, target_storage) in &mut targets {
let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
// this means the account has not been fetched yet, so we must fetch everything
// associated with this account
continue
};
let prev_target_storage_len = target_storage.len();
// keep only the storage slots that have not been fetched yet
//
// we already removed subsets, so this should only remove duplicates
target_storage.retain(|slot| !fetched_storage.contains(slot));
duplicates += prev_target_storage_len - target_storage.len();
}
if duplicates > 0 {
trace!(target: "engine::root", duplicates, "Removed duplicate prefetch proof targets");
}
targets
}
/// Handles state updates.
///
/// Returns proof targets derived from the state update.
@ -974,6 +1025,7 @@ fn extend_multi_proof_targets_ref(targets: &mut MultiProofTargets, other: &Multi
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::map::B256Set;
use reth_primitives_traits::{Account as RethAccount, StorageEntry};
use reth_provider::{
providers::ConsistentDbView, test_utils::create_test_provider_factory, HashingWriter,
@ -1041,6 +1093,42 @@ mod tests {
updates
}
fn create_state_root_config<F>(factory: F, input: TrieInput) -> StateRootConfig<F>
where
F: DatabaseProviderFactory<Provider: BlockReader>
+ StateCommitmentProvider
+ Clone
+ 'static,
{
let consistent_view = ConsistentDbView::new(factory, None);
let nodes_sorted = Arc::new(input.nodes.clone().into_sorted());
let state_sorted = Arc::new(input.state.clone().into_sorted());
let prefix_sets = Arc::new(input.prefix_sets);
StateRootConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets }
}
fn create_test_state_root_task<F>(factory: F) -> StateRootTask<F>
where
F: DatabaseProviderFactory<Provider: BlockReader>
+ StateCommitmentProvider
+ Clone
+ 'static,
{
let num_threads = thread_pool_size();
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("test-worker-{}", i))
.build()
.expect("Failed to create test proof worker thread pool");
let thread_pool = Arc::new(thread_pool);
let config = create_state_root_config(factory, TrieInput::default());
StateRootTask::new(config, thread_pool)
}
#[test]
fn test_state_root_task() {
reth_tracing::init_test_tracing();
@ -1371,4 +1459,85 @@ mod tests {
assert!(target_slots.contains(&slot1));
assert!(target_slots.contains(&slot2));
}
#[test]
fn test_get_prefetch_proof_targets_no_duplicates() {
let test_provider_factory = create_test_provider_factory();
let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
// populate some targets
let mut targets = MultiProofTargets::default();
let addr1 = B256::random();
let addr2 = B256::random();
let slot1 = B256::random();
let slot2 = B256::random();
targets.insert(addr1, vec![slot1].into_iter().collect());
targets.insert(addr2, vec![slot2].into_iter().collect());
let prefetch_proof_targets =
test_state_root_task.get_prefetch_proof_targets(targets.clone());
// check that the prefetch proof targets are the same because there are no fetched proof
// targets yet
assert_eq!(prefetch_proof_targets, targets);
// add a different addr and slot to fetched proof targets
let addr3 = B256::random();
let slot3 = B256::random();
test_state_root_task.fetched_proof_targets.insert(addr3, vec![slot3].into_iter().collect());
let prefetch_proof_targets =
test_state_root_task.get_prefetch_proof_targets(targets.clone());
// check that the prefetch proof targets are the same because the fetched proof targets
// don't overlap with the prefetch targets
assert_eq!(prefetch_proof_targets, targets);
}
#[test]
fn test_get_prefetch_proof_targets_remove_subset() {
let test_provider_factory = create_test_provider_factory();
let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
// populate some targe
let mut targets = MultiProofTargets::default();
let addr1 = B256::random();
let addr2 = B256::random();
let slot1 = B256::random();
let slot2 = B256::random();
targets.insert(addr1, vec![slot1].into_iter().collect());
targets.insert(addr2, vec![slot2].into_iter().collect());
// add a subset of the first target to fetched proof targets
test_state_root_task.fetched_proof_targets.insert(addr1, vec![slot1].into_iter().collect());
let prefetch_proof_targets =
test_state_root_task.get_prefetch_proof_targets(targets.clone());
// check that the prefetch proof targets do not include the subset
assert_eq!(prefetch_proof_targets.len(), 1);
assert!(!prefetch_proof_targets.contains_key(&addr1));
assert!(prefetch_proof_targets.contains_key(&addr2));
// now add one more slot to the prefetch targets
let slot3 = B256::random();
targets.get_mut(&addr1).unwrap().insert(slot3);
let prefetch_proof_targets =
test_state_root_task.get_prefetch_proof_targets(targets.clone());
// check that the prefetch proof targets do not include the subset
// but include the new slot
assert_eq!(prefetch_proof_targets.len(), 2);
assert!(prefetch_proof_targets.contains_key(&addr1));
assert_eq!(
*prefetch_proof_targets.get(&addr1).unwrap(),
vec![slot3].into_iter().collect::<B256Set>()
);
assert!(prefetch_proof_targets.contains_key(&addr2));
assert_eq!(
*prefetch_proof_targets.get(&addr2).unwrap(),
vec![slot2].into_iter().collect::<B256Set>()
);
}
}