From 06132f509cdc498d01ccbec9eed4fbbf85bd1e0e Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Wed, 5 Feb 2025 17:30:49 -0500 Subject: [PATCH] perf(trie): deduplicate already fetched prefetch targets (#14223) Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com> --- crates/engine/tree/src/tree/root.rs | 173 +++++++++++++++++++++++++++- 1 file changed, 171 insertions(+), 2 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 26304b53e..4de738f24 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -555,17 +555,68 @@ where /// Handles request for proof prefetch. fn on_prefetch_proof(&mut self, targets: MultiProofTargets) { - extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &targets); + let proof_targets = self.get_prefetch_proof_targets(targets); + extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &proof_targets); self.multiproof_manager.spawn_or_queue(MultiproofInput { config: self.config.clone(), hashed_state_update: Default::default(), - proof_targets: targets, + proof_targets, proof_sequence_number: self.proof_sequencer.next_sequence(), state_root_message_sender: self.tx.clone(), }); } + /// Calls `get_proof_targets` with existing proof targets for prefetching. + fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets { + // Here we want to filter out any targets that are already fetched + // + // This means we need to remove any storage slots that have already been fetched + let mut duplicates = 0; + + // First remove all storage targets that are subsets of already fetched storage slots + targets.retain(|hashed_address, target_storage| { + let keep = self + .fetched_proof_targets + .get(hashed_address) + // do NOT remove if None, because that means the account has not been fetched yet + .is_none_or(|fetched_storage| { + // remove if a subset + !target_storage.is_subset(fetched_storage) + }); + + if !keep { + duplicates += target_storage.len(); + } + + keep + }); + + // For all non-subset remaining targets, we have to calculate the difference + for (hashed_address, target_storage) in &mut targets { + let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else { + // this means the account has not been fetched yet, so we must fetch everything + // associated with this account + continue + }; + + let prev_target_storage_len = target_storage.len(); + + // keep only the storage slots that have not been fetched yet + // + // we already removed subsets, so this should only remove duplicates + target_storage.retain(|slot| !fetched_storage.contains(slot)); + + duplicates += prev_target_storage_len - target_storage.len(); + } + + if duplicates > 0 { + trace!(target: "engine::root", duplicates, "Removed duplicate prefetch proof targets"); + } + + targets + } + /// Handles state updates. /// /// Returns proof targets derived from the state update. @@ -974,6 +1025,7 @@ fn extend_multi_proof_targets_ref(targets: &mut MultiProofTargets, other: &Multi #[cfg(test)] mod tests { use super::*; + use alloy_primitives::map::B256Set; use reth_primitives_traits::{Account as RethAccount, StorageEntry}; use reth_provider::{ providers::ConsistentDbView, test_utils::create_test_provider_factory, HashingWriter, @@ -1041,6 +1093,42 @@ mod tests { updates } + fn create_state_root_config(factory: F, input: TrieInput) -> StateRootConfig + where + F: DatabaseProviderFactory + + StateCommitmentProvider + + Clone + + 'static, + { + let consistent_view = ConsistentDbView::new(factory, None); + let nodes_sorted = Arc::new(input.nodes.clone().into_sorted()); + let state_sorted = Arc::new(input.state.clone().into_sorted()); + let prefix_sets = Arc::new(input.prefix_sets); + + StateRootConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets } + } + + fn create_test_state_root_task(factory: F) -> StateRootTask + where + F: DatabaseProviderFactory + + StateCommitmentProvider + + Clone + + 'static, + { + let num_threads = thread_pool_size(); + + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .thread_name(|i| format!("test-worker-{}", i)) + .build() + .expect("Failed to create test proof worker thread pool"); + + let thread_pool = Arc::new(thread_pool); + let config = create_state_root_config(factory, TrieInput::default()); + + StateRootTask::new(config, thread_pool) + } + #[test] fn test_state_root_task() { reth_tracing::init_test_tracing(); @@ -1371,4 +1459,85 @@ mod tests { assert!(target_slots.contains(&slot1)); assert!(target_slots.contains(&slot2)); } + + #[test] + fn test_get_prefetch_proof_targets_no_duplicates() { + let test_provider_factory = create_test_provider_factory(); + let mut test_state_root_task = create_test_state_root_task(test_provider_factory); + + // populate some targets + let mut targets = MultiProofTargets::default(); + let addr1 = B256::random(); + let addr2 = B256::random(); + let slot1 = B256::random(); + let slot2 = B256::random(); + targets.insert(addr1, vec![slot1].into_iter().collect()); + targets.insert(addr2, vec![slot2].into_iter().collect()); + + let prefetch_proof_targets = + test_state_root_task.get_prefetch_proof_targets(targets.clone()); + + // check that the prefetch proof targets are the same because there are no fetched proof + // targets yet + assert_eq!(prefetch_proof_targets, targets); + + // add a different addr and slot to fetched proof targets + let addr3 = B256::random(); + let slot3 = B256::random(); + test_state_root_task.fetched_proof_targets.insert(addr3, vec![slot3].into_iter().collect()); + + let prefetch_proof_targets = + test_state_root_task.get_prefetch_proof_targets(targets.clone()); + + // check that the prefetch proof targets are the same because the fetched proof targets + // don't overlap with the prefetch targets + assert_eq!(prefetch_proof_targets, targets); + } + + #[test] + fn test_get_prefetch_proof_targets_remove_subset() { + let test_provider_factory = create_test_provider_factory(); + let mut test_state_root_task = create_test_state_root_task(test_provider_factory); + + // populate some targe + let mut targets = MultiProofTargets::default(); + let addr1 = B256::random(); + let addr2 = B256::random(); + let slot1 = B256::random(); + let slot2 = B256::random(); + targets.insert(addr1, vec![slot1].into_iter().collect()); + targets.insert(addr2, vec![slot2].into_iter().collect()); + + // add a subset of the first target to fetched proof targets + test_state_root_task.fetched_proof_targets.insert(addr1, vec![slot1].into_iter().collect()); + + let prefetch_proof_targets = + test_state_root_task.get_prefetch_proof_targets(targets.clone()); + + // check that the prefetch proof targets do not include the subset + assert_eq!(prefetch_proof_targets.len(), 1); + assert!(!prefetch_proof_targets.contains_key(&addr1)); + assert!(prefetch_proof_targets.contains_key(&addr2)); + + // now add one more slot to the prefetch targets + let slot3 = B256::random(); + targets.get_mut(&addr1).unwrap().insert(slot3); + + let prefetch_proof_targets = + test_state_root_task.get_prefetch_proof_targets(targets.clone()); + + // check that the prefetch proof targets do not include the subset + // but include the new slot + assert_eq!(prefetch_proof_targets.len(), 2); + assert!(prefetch_proof_targets.contains_key(&addr1)); + assert_eq!( + *prefetch_proof_targets.get(&addr1).unwrap(), + vec![slot3].into_iter().collect::() + ); + assert!(prefetch_proof_targets.contains_key(&addr2)); + assert_eq!( + *prefetch_proof_targets.get(&addr2).unwrap(), + vec![slot2].into_iter().collect::() + ); + } }