From e3d8ceb4be9e3e8b528dab7a7dc07281be596046 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 26 Mar 2024 17:49:08 +0100 Subject: [PATCH] feat: add `ETL` to `HistoryStages` (#7249) --- bin/reth/src/commands/stage/drop.rs | 234 +++++++++-------- crates/etl/src/lib.rs | 4 - crates/node-core/src/node_config.rs | 2 + .../src/stages/index_account_history.rs | 81 ++++-- .../src/stages/index_storage_history.rs | 86 ++++-- crates/stages/src/stages/mod.rs | 4 + crates/stages/src/stages/utils.rs | 244 ++++++++++++++++++ crates/storage/db/src/abstraction/table.rs | 5 +- .../storage/db/src/tables/models/accounts.rs | 62 ++++- crates/storage/db/src/tables/utils.rs | 43 +-- 10 files changed, 576 insertions(+), 189 deletions(-) create mode 100644 crates/stages/src/stages/utils.rs diff --git a/bin/reth/src/commands/stage/drop.rs b/bin/reth/src/commands/stage/drop.rs index 3e90d31d8..3e33183cb 100644 --- a/bin/reth/src/commands/stage/drop.rs +++ b/bin/reth/src/commands/stage/drop.rs @@ -10,11 +10,8 @@ use crate::{ }; use clap::Parser; use itertools::Itertools; -use reth_db::{ - database::Database, open_db, static_file::iter_static_files, tables, transaction::DbTxMut, - DatabaseEnv, -}; -use reth_node_core::init::{insert_genesis_header, insert_genesis_state}; +use reth_db::{open_db, static_file::iter_static_files, tables, transaction::DbTxMut, DatabaseEnv}; +use reth_node_core::init::{insert_genesis_header, insert_genesis_history, insert_genesis_state}; use reth_primitives::{ fs, stage::StageId, static_file::find_fixed_range, ChainSpec, StaticFileSegment, }; @@ -91,124 +88,125 @@ impl Command { } } - tool.provider_factory.db_ref().update(|tx| { - match self.stage { - StageEnum::Headers => { - tx.clear::()?; - tx.clear::()?; - tx.clear::()?; - tx.clear::()?; - tx.put::( - StageId::Headers.to_string(), - Default::default(), - )?; - insert_genesis_header::(tx, &static_file_provider, self.chain)?; - } - StageEnum::Bodies => { - tx.clear::()?; - tx.clear::()?; - tx.clear::()?; - tx.clear::()?; - tx.clear::()?; - tx.put::( - StageId::Bodies.to_string(), - Default::default(), - )?; - insert_genesis_header::(tx, &static_file_provider, self.chain)?; - } - StageEnum::Senders => { - tx.clear::()?; - tx.put::( - StageId::SenderRecovery.to_string(), - Default::default(), - )?; - } - StageEnum::Execution => { - tx.clear::()?; - tx.clear::()?; - tx.clear::()?; - tx.clear::()?; - tx.clear::()?; - tx.clear::()?; - tx.put::( - StageId::Execution.to_string(), - Default::default(), - )?; - insert_genesis_state::(tx, self.chain.genesis())?; - } - StageEnum::AccountHashing => { - tx.clear::()?; - tx.put::( - StageId::AccountHashing.to_string(), - Default::default(), - )?; - } - StageEnum::StorageHashing => { - tx.clear::()?; - tx.put::( - StageId::StorageHashing.to_string(), - Default::default(), - )?; - } - StageEnum::Hashing => { - // Clear hashed accounts - tx.clear::()?; - tx.put::( - StageId::AccountHashing.to_string(), - Default::default(), - )?; + let provider_rw = tool.provider_factory.provider_rw()?; + let tx = provider_rw.tx_ref(); - // Clear hashed storages - tx.clear::()?; - tx.put::( - StageId::StorageHashing.to_string(), - Default::default(), - )?; - } - StageEnum::Merkle => { - tx.clear::()?; - tx.clear::()?; - tx.put::( - StageId::MerkleExecute.to_string(), - Default::default(), - )?; - tx.put::( - StageId::MerkleUnwind.to_string(), - Default::default(), - )?; - tx.delete::( - StageId::MerkleExecute.to_string(), - None, - )?; - } - StageEnum::AccountHistory | StageEnum::StorageHistory => { - tx.clear::()?; - tx.clear::()?; - tx.put::( - StageId::IndexAccountHistory.to_string(), - Default::default(), - )?; - tx.put::( - StageId::IndexStorageHistory.to_string(), - Default::default(), - )?; - } - StageEnum::TxLookup => { - tx.clear::()?; - tx.put::( - StageId::TransactionLookup.to_string(), - Default::default(), - )?; - insert_genesis_header::(tx, &static_file_provider, self.chain)?; - } + match self.stage { + StageEnum::Headers => { + tx.clear::()?; + tx.clear::()?; + tx.clear::()?; + tx.clear::()?; + tx.put::( + StageId::Headers.to_string(), + Default::default(), + )?; + insert_genesis_header::(tx, &static_file_provider, self.chain)?; } + StageEnum::Bodies => { + tx.clear::()?; + tx.clear::()?; + tx.clear::()?; + tx.clear::()?; + tx.clear::()?; + tx.put::( + StageId::Bodies.to_string(), + Default::default(), + )?; + insert_genesis_header::(tx, &static_file_provider, self.chain)?; + } + StageEnum::Senders => { + tx.clear::()?; + tx.put::( + StageId::SenderRecovery.to_string(), + Default::default(), + )?; + } + StageEnum::Execution => { + tx.clear::()?; + tx.clear::()?; + tx.clear::()?; + tx.clear::()?; + tx.clear::()?; + tx.clear::()?; + tx.put::( + StageId::Execution.to_string(), + Default::default(), + )?; + insert_genesis_state::(tx, self.chain.genesis())?; + } + StageEnum::AccountHashing => { + tx.clear::()?; + tx.put::( + StageId::AccountHashing.to_string(), + Default::default(), + )?; + } + StageEnum::StorageHashing => { + tx.clear::()?; + tx.put::( + StageId::StorageHashing.to_string(), + Default::default(), + )?; + } + StageEnum::Hashing => { + // Clear hashed accounts + tx.clear::()?; + tx.put::( + StageId::AccountHashing.to_string(), + Default::default(), + )?; - tx.put::(StageId::Finish.to_string(), Default::default())?; + // Clear hashed storages + tx.clear::()?; + tx.put::( + StageId::StorageHashing.to_string(), + Default::default(), + )?; + } + StageEnum::Merkle => { + tx.clear::()?; + tx.clear::()?; + tx.put::( + StageId::MerkleExecute.to_string(), + Default::default(), + )?; + tx.put::( + StageId::MerkleUnwind.to_string(), + Default::default(), + )?; + tx.delete::( + StageId::MerkleExecute.to_string(), + None, + )?; + } + StageEnum::AccountHistory | StageEnum::StorageHistory => { + tx.clear::()?; + tx.clear::()?; + tx.put::( + StageId::IndexAccountHistory.to_string(), + Default::default(), + )?; + tx.put::( + StageId::IndexStorageHistory.to_string(), + Default::default(), + )?; + insert_genesis_history(&provider_rw, &self.chain.genesis)?; + } + StageEnum::TxLookup => { + tx.clear::()?; + tx.put::( + StageId::TransactionLookup.to_string(), + Default::default(), + )?; + insert_genesis_header::(tx, &static_file_provider, self.chain)?; + } + } - static_file_provider.commit()?; + tx.put::(StageId::Finish.to_string(), Default::default())?; - Ok::<_, eyre::Error>(()) - })??; + static_file_provider.commit()?; + provider_rw.commit()?; Ok(()) } diff --git a/crates/etl/src/lib.rs b/crates/etl/src/lib.rs index 595ae02c6..7d233cbc6 100644 --- a/crates/etl/src/lib.rs +++ b/crates/etl/src/lib.rs @@ -39,8 +39,6 @@ pub struct Collector where K: Encode + Ord, V: Compress, - ::Encoded: std::fmt::Debug, - ::Compressed: std::fmt::Debug, { /// Parent directory where to create ETL files parent_dir: Option, @@ -62,8 +60,6 @@ impl Collector where K: Key, V: Value, - ::Encoded: Ord + std::fmt::Debug, - ::Compressed: Ord + std::fmt::Debug, { /// Create a new collector with some capacity. /// diff --git a/crates/node-core/src/node_config.rs b/crates/node-core/src/node_config.rs index 319128732..4078c29d9 100644 --- a/crates/node-core/src/node_config.rs +++ b/crates/node-core/src/node_config.rs @@ -875,10 +875,12 @@ impl NodeConfig { .set(IndexAccountHistoryStage::new( stage_config.index_account_history.commit_threshold, prune_modes.account_history, + stage_config.etl.clone(), )) .set(IndexStorageHistoryStage::new( stage_config.index_storage_history.commit_threshold, prune_modes.storage_history, + stage_config.etl.clone(), )), ) .build(provider_factory, static_file_producer); diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 2d8349386..76d01dbdd 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -1,18 +1,22 @@ +use super::{collect_history_indices, load_history_indices}; use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; -use reth_db::database::Database; +use reth_config::config::EtlConfig; +use reth_db::{ + database::Database, models::ShardedKey, table::Decode, tables, transaction::DbTxMut, +}; use reth_primitives::{ stage::{StageCheckpoint, StageId}, - PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, + Address, PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, }; use reth_provider::{ - AccountExtReader, DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, - PruneCheckpointWriter, + DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, }; use std::fmt::Debug; +use tracing::info; /// Stage is indexing history the account changesets generated in /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information -/// on index sharding take a look at [`reth_db::tables::AccountsHistory`] +/// on index sharding take a look at [`tables::AccountsHistory`] #[derive(Debug)] pub struct IndexAccountHistoryStage { /// Number of blocks after which the control @@ -20,18 +24,24 @@ pub struct IndexAccountHistoryStage { pub commit_threshold: u64, /// Pruning configuration. pub prune_mode: Option, + /// ETL configuration + pub etl_config: EtlConfig, } impl IndexAccountHistoryStage { /// Create new instance of [IndexAccountHistoryStage]. - pub fn new(commit_threshold: u64, prune_mode: Option) -> Self { - Self { commit_threshold, prune_mode } + pub fn new( + commit_threshold: u64, + prune_mode: Option, + etl_config: EtlConfig, + ) -> Self { + Self { commit_threshold, prune_mode, etl_config } } } impl Default for IndexAccountHistoryStage { fn default() -> Self { - Self { commit_threshold: 100_000, prune_mode: None } + Self { commit_threshold: 100_000, prune_mode: None, etl_config: EtlConfig::default() } } } @@ -81,13 +91,37 @@ impl Stage for IndexAccountHistoryStage { return Ok(ExecOutput::done(input.checkpoint())) } - let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); + let mut range = input.next_block_range(); + let first_sync = input.checkpoint().block_number == 0; - let indices = provider.changed_accounts_and_blocks_with_range(range.clone())?; - // Insert changeset to history index - provider.insert_account_history_index(indices)?; + // On first sync we might have history coming from genesis. We clear the table since it's + // faster to rebuild from scratch. + if first_sync { + provider.tx_ref().clear::()?; + range = 0..=*input.next_block_range().end(); + } - Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range }) + info!(target: "sync::stages::index_account_history::exec", ?first_sync, "Collecting indices"); + let collector = + collect_history_indices::<_, tables::AccountChangeSets, tables::AccountsHistory, _>( + provider.tx_ref(), + range.clone(), + ShardedKey::new, + |(index, value)| (index, value.address), + &self.etl_config, + )?; + + info!(target: "sync::stages::index_account_history::exec", "Loading indices into database"); + load_history_indices::<_, tables::AccountsHistory, _>( + provider.tx_ref(), + collector, + first_sync, + ShardedKey::new, + ShardedKey::
::decode, + |key| key.key, + )?; + + Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true }) } /// Unwind the stage. @@ -117,18 +151,17 @@ mod tests { use reth_db::{ cursor::DbCursorRO, models::{ - sharded_key, sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey, + sharded_key, sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, StoredBlockBodyIndices, }, - tables, - transaction::{DbTx, DbTxMut}, + transaction::DbTx, BlockNumberList, }; use reth_interfaces::test_utils::{ generators, generators::{random_block_range, random_changeset_range, random_contract_account_range}, }; - use reth_primitives::{address, Address, BlockNumber, B256}; + use reth_primitives::{address, BlockNumber, B256}; use reth_provider::providers::StaticFileWriter; use std::collections::BTreeMap; @@ -205,7 +238,7 @@ mod tests { } #[tokio::test] - async fn insert_index_to_empty() { + async fn insert_index_to_genesis() { // init let db = TestStageDB::default(); @@ -217,14 +250,14 @@ mod tests { // verify let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3])])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0, 1, 2, 3])])); // unwind unwind(&db, 3, 0); // verify initial state - let table = db.table::().unwrap(); - assert!(table.is_empty()); + let table = cast(db.table::().unwrap()); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0])])); } #[tokio::test] @@ -484,7 +517,11 @@ mod tests { } fn stage(&self) -> Self::S { - Self::S { commit_threshold: self.commit_threshold, prune_mode: self.prune_mode } + Self::S { + commit_threshold: self.commit_threshold, + prune_mode: self.prune_mode, + etl_config: EtlConfig::default(), + } } } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 03b32607c..c00de1632 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -1,17 +1,26 @@ +use super::{collect_history_indices, load_history_indices}; use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; -use reth_db::{database::Database, models::BlockNumberAddress}; +use reth_config::config::EtlConfig; +use reth_db::{ + database::Database, + models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress}, + table::Decode, + tables, + transaction::DbTxMut, +}; use reth_primitives::{ stage::{StageCheckpoint, StageId}, PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, }; use reth_provider::{ - DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageReader, + DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, }; use std::fmt::Debug; +use tracing::info; /// Stage is indexing history the account changesets generated in /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information -/// on index sharding take a look at [`reth_db::tables::StoragesHistory`]. +/// on index sharding take a look at [`tables::StoragesHistory`]. #[derive(Debug)] pub struct IndexStorageHistoryStage { /// Number of blocks after which the control @@ -19,18 +28,24 @@ pub struct IndexStorageHistoryStage { pub commit_threshold: u64, /// Pruning configuration. pub prune_mode: Option, + /// ETL configuration + pub etl_config: EtlConfig, } impl IndexStorageHistoryStage { /// Create new instance of [IndexStorageHistoryStage]. - pub fn new(commit_threshold: u64, prune_mode: Option) -> Self { - Self { commit_threshold, prune_mode } + pub fn new( + commit_threshold: u64, + prune_mode: Option, + etl_config: EtlConfig, + ) -> Self { + Self { commit_threshold, prune_mode, etl_config } } } impl Default for IndexStorageHistoryStage { fn default() -> Self { - Self { commit_threshold: 100_000, prune_mode: None } + Self { commit_threshold: 100_000, prune_mode: None, etl_config: EtlConfig::default() } } } @@ -80,12 +95,41 @@ impl Stage for IndexStorageHistoryStage { return Ok(ExecOutput::done(input.checkpoint())) } - let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); + let mut range = input.next_block_range(); + let first_sync = input.checkpoint().block_number == 0; - let indices = provider.changed_storages_and_blocks_with_range(range.clone())?; - provider.insert_storage_history_index(indices)?; + // On first sync we might have history coming from genesis. We clear the table since it's + // faster to rebuild from scratch. + if first_sync { + provider.tx_ref().clear::()?; + range = 0..=*input.next_block_range().end(); + } - Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range }) + info!(target: "sync::stages::index_storage_history::exec", ?first_sync, "Collecting indices"); + let collector = + collect_history_indices::<_, tables::StorageChangeSets, tables::StoragesHistory, _>( + provider.tx_ref(), + BlockNumberAddress::range(range.clone()), + |AddressStorageKey((address, storage_key)), highest_block_number| { + StorageShardedKey::new(address, storage_key, highest_block_number) + }, + |(key, value)| (key.block_number(), AddressStorageKey((key.address(), value.key))), + &self.etl_config, + )?; + + info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database"); + load_history_indices::<_, tables::StoragesHistory, _>( + provider.tx_ref(), + collector, + first_sync, + |AddressStorageKey((address, storage_key)), highest_block_number| { + StorageShardedKey::new(address, storage_key, highest_block_number) + }, + StorageShardedKey::decode, + |key| AddressStorageKey((key.address, key.sharded_key.key)), + )?; + + Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true }) } /// Unwind the stage. @@ -114,12 +158,10 @@ mod tests { use reth_db::{ cursor::DbCursorRO, models::{ - sharded_key, - storage_sharded_key::{StorageShardedKey, NUM_OF_INDICES_IN_SHARD}, - ShardedKey, StoredBlockBodyIndices, + sharded_key, storage_sharded_key::NUM_OF_INDICES_IN_SHARD, ShardedKey, + StoredBlockBodyIndices, }, - tables, - transaction::{DbTx, DbTxMut}, + transaction::DbTx, BlockNumberList, }; use reth_interfaces::test_utils::{ @@ -216,7 +258,7 @@ mod tests { } #[tokio::test] - async fn insert_index_to_empty() { + async fn insert_index_to_genesis() { // init let db = TestStageDB::default(); @@ -228,14 +270,14 @@ mod tests { // verify let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3])])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0, 1, 2, 3])])); // unwind unwind(&db, 5, 0); // verify initial state - let table = db.table::().unwrap(); - assert!(table.is_empty()); + let table = cast(db.table::().unwrap()); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0])])); } #[tokio::test] @@ -498,7 +540,11 @@ mod tests { } fn stage(&self) -> Self::S { - Self::S { commit_threshold: self.commit_threshold, prune_mode: self.prune_mode } + Self::S { + commit_threshold: self.commit_threshold, + prune_mode: self.prune_mode, + etl_config: EtlConfig::default(), + } } } diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index ebb88f2ce..8d97491f1 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -30,9 +30,13 @@ pub use headers::*; pub use index_account_history::*; pub use index_storage_history::*; pub use merkle::*; + pub use sender_recovery::*; pub use tx_lookup::*; +mod utils; +use utils::*; + #[cfg(test)] mod tests { use super::*; diff --git a/crates/stages/src/stages/utils.rs b/crates/stages/src/stages/utils.rs new file mode 100644 index 000000000..a37e38afc --- /dev/null +++ b/crates/stages/src/stages/utils.rs @@ -0,0 +1,244 @@ +//! Utils for `stages`. +use crate::StageError; +use reth_config::config::EtlConfig; +use reth_db::{ + cursor::{DbCursorRO, DbCursorRW}, + models::sharded_key::NUM_OF_INDICES_IN_SHARD, + table::{Decompress, Table}, + transaction::{DbTx, DbTxMut}, + BlockNumberList, DatabaseError, +}; +use reth_etl::Collector; +use reth_primitives::BlockNumber; +use std::{collections::HashMap, hash::Hash, ops::RangeBounds}; +use tracing::info; + +/// Number of blocks before pushing indices from cache to [`Collector`] +const DEFAULT_CACHE_THRESHOLD: u64 = 100_000; + +/// Collects all history (`H`) indices for a range of changesets (`CS`) and stores them in a +/// [`Collector`]. +/// +/// ## Process +/// The function utilizes a `HashMap` cache with a structure of `PartialKey` (`P`) (Address or +/// Address.StorageKey) to `BlockNumberList`. When the cache exceeds its capacity, its contents are +/// moved to a [`Collector`]. Here, each entry's key is a concatenation of `PartialKey` and the +/// highest block number in its list. +/// +/// ## Example +/// 1. Initial Cache State: `{ Address1: [1,2,3], ... }` +/// 2. Cache is flushed to the `Collector`. +/// 3. Updated Cache State: `{ Address1: [100,300], ... }` +/// 4. Cache is flushed again. +/// +/// As a result, the `Collector` will contain entries such as `(Address1.3, [1,2,3])` and +/// `(Address1.300, [100,300])`. The entries may be stored across one or more files. +pub(crate) fn collect_history_indices( + tx: &TX, + range: impl RangeBounds, + sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key, + partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P), + etl_config: &EtlConfig, +) -> Result, StageError> +where + TX: DbTxMut + DbTx, + CS: Table, + H: Table, + P: Copy + Eq + Hash, +{ + let mut changeset_cursor = tx.cursor_read::()?; + + let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone()); + let mut cache: HashMap> = HashMap::new(); + + let mut collect = |cache: &HashMap>| { + for (key, indice_list) in cache { + let last = indice_list.last().expect("qed"); + collector.insert( + sharded_key_factory(*key, *last), + BlockNumberList::new_pre_sorted(indice_list), + )?; + } + Ok::<(), StageError>(()) + }; + + // observability + let total_changesets = tx.entries::()?; + let interval = (total_changesets / 100).max(1); + + let mut flush_counter = 0; + let mut current_block_number = u64::MAX; + for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() { + let (block_number, key) = partial_key_factory(entry?); + cache.entry(key).or_default().push(block_number); + + if idx > 0 && idx % interval == 0 && total_changesets > 100 { + info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices"); + } + + // Make sure we only flush the cache every DEFAULT_CACHE_THRESHOLD blocks. + if current_block_number != block_number { + current_block_number = block_number; + flush_counter += 1; + if flush_counter > DEFAULT_CACHE_THRESHOLD { + collect(&cache)?; + cache.clear(); + flush_counter = 0; + } + } + } + collect(&cache)?; + + Ok(collector) +} + +/// Given a [`Collector`] created by [`collect_history_indices`] it iterates all entries, loading +/// the indices into the database in shards. +/// +/// ## Process +/// Iterates over elements, grouping indices by their partial keys (e.g., `Address` or +/// `Address.StorageKey`). It flushes indices to disk when reaching a shard's max length +/// (`NUM_OF_INDICES_IN_SHARD`) or when the partial key changes, ensuring the last previous partial +/// key shard is stored. +pub(crate) fn load_history_indices( + tx: &TX, + mut collector: Collector, + append_only: bool, + sharded_key_factory: impl Clone + Fn(P, u64) -> ::Key, + decode_key: impl Fn(Vec) -> Result<::Key, DatabaseError>, + get_partial: impl Fn(::Key) -> P, +) -> Result<(), StageError> +where + TX: DbTxMut + DbTx, + H: Table, + P: Copy + Default + Eq, +{ + let mut write_cursor = tx.cursor_write::()?; + let mut current_partial = P::default(); + let mut current_list = Vec::::new(); + + // observability + let total_entries = collector.len(); + let interval = (total_entries / 100).max(1); + + for (index, element) in collector.iter()?.enumerate() { + let (k, v) = element?; + let sharded_key = decode_key(k)?; + let new_list = BlockNumberList::decompress_owned(v)?; + + if index > 0 && index % interval == 0 && total_entries > 100 { + info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices"); + } + + // AccountsHistory: `Address`. + // StorageHistory: `Address.StorageKey`. + let partial_key = get_partial(sharded_key); + + if current_partial != partial_key { + // We have reached the end of this subset of keys so + // we need to flush its last indice shard. + load_indices( + &mut write_cursor, + current_partial, + &mut current_list, + &sharded_key_factory, + append_only, + LoadMode::Flush, + )?; + + current_partial = partial_key; + current_list.clear(); + + // If it's not the first sync, there might an existing shard already, so we need to + // merge it with the one coming from the collector + if !append_only { + if let Some((_, last_database_shard)) = + write_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))? + { + current_list.extend(last_database_shard.iter()); + } + } + } + + current_list.extend(new_list.iter()); + load_indices( + &mut write_cursor, + current_partial, + &mut current_list, + &sharded_key_factory, + append_only, + LoadMode::KeepLast, + )?; + } + + // There will be one remaining shard that needs to be flushed to DB. + load_indices( + &mut write_cursor, + current_partial, + &mut current_list, + &sharded_key_factory, + append_only, + LoadMode::Flush, + )?; + + Ok(()) +} + +/// Shard and insert the indice list according to [`LoadMode`] and its length. +pub(crate) fn load_indices( + cursor: &mut C, + partial_key: P, + list: &mut Vec, + sharded_key_factory: &impl Fn(P, BlockNumber) -> ::Key, + append_only: bool, + mode: LoadMode, +) -> Result<(), StageError> +where + C: DbCursorRO + DbCursorRW, + H: Table, + P: Copy, +{ + if list.len() > NUM_OF_INDICES_IN_SHARD || mode.is_flush() { + let chunks = list + .chunks(NUM_OF_INDICES_IN_SHARD) + .map(|chunks| chunks.to_vec()) + .collect::>>(); + + let mut iter = chunks.into_iter().peekable(); + while let Some(chunk) = iter.next() { + let mut highest = *chunk.last().expect("at least one index"); + + if !mode.is_flush() && iter.peek().is_none() { + *list = chunk; + } else { + if iter.peek().is_none() { + highest = u64::MAX; + } + let key = sharded_key_factory(partial_key, highest); + let value = BlockNumberList::new_pre_sorted(chunk); + + if append_only { + cursor.append(key, value)?; + } else { + cursor.upsert(key, value)?; + } + } + } + } + + Ok(()) +} + +/// Mode on how to load index shards into the database. +pub(crate) enum LoadMode { + /// Keep the last shard in memory and don't flush it to the database. + KeepLast, + /// Flush all shards into the database. + Flush, +} + +impl LoadMode { + fn is_flush(&self) -> bool { + matches!(self, Self::Flush) + } +} diff --git a/crates/storage/db/src/abstraction/table.rs b/crates/storage/db/src/abstraction/table.rs index a68aa9bb0..862664d70 100644 --- a/crates/storage/db/src/abstraction/table.rs +++ b/crates/storage/db/src/abstraction/table.rs @@ -16,7 +16,8 @@ pub trait Compress: Send + Sync + Sized + Debug { + Into> + Default + Send - + Sync; + + Sync + + Debug; /// If the type cannot be compressed, return its inner reference as `Some(self.as_ref())` fn uncompressable_ref(&self) -> Option<&[u8]> { @@ -48,7 +49,7 @@ pub trait Decompress: Send + Sync + Sized + Debug { /// Trait that will transform the data to be saved in the DB. pub trait Encode: Send + Sync + Sized + Debug { /// Encoded type. - type Encoded: AsRef<[u8]> + Into> + Send + Sync; + type Encoded: AsRef<[u8]> + Into> + Send + Sync + Ord + Debug; /// Encodes data going into the database. fn encode(self) -> Self::Encoded; diff --git a/crates/storage/db/src/tables/models/accounts.rs b/crates/storage/db/src/tables/models/accounts.rs index eef4f5c7f..7b0770da2 100644 --- a/crates/storage/db/src/tables/models/accounts.rs +++ b/crates/storage/db/src/tables/models/accounts.rs @@ -8,7 +8,7 @@ use crate::{ DatabaseError, }; use reth_codecs::{derive_arbitrary, Compact}; -use reth_primitives::{Account, Address, BlockNumber, Buf}; +use reth_primitives::{Account, Address, BlockNumber, Buf, StorageKey}; use serde::{Deserialize, Serialize}; /// Account as it is saved inside [`AccountChangeSets`][crate::tables::AccountChangeSets]. @@ -121,7 +121,40 @@ impl Decode for BlockNumberAddress { } } -impl_fixed_arbitrary!(BlockNumberAddress, 28); +/// [`Address`] concatenated with [`StorageKey`]. Used by `reth_etl` and history stages. +/// +/// Since it's used as a key, it isn't compressed when encoding it. +#[derive( + Debug, Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Ord, PartialOrd, Hash, +)] +pub struct AddressStorageKey(pub (Address, StorageKey)); + +impl Encode for AddressStorageKey { + type Encoded = [u8; 52]; + + fn encode(self) -> Self::Encoded { + let address = self.0 .0; + let storage_key = self.0 .1; + + let mut buf = [0u8; 52]; + + buf[..20].copy_from_slice(address.as_slice()); + buf[20..].copy_from_slice(storage_key.as_slice()); + buf + } +} + +impl Decode for AddressStorageKey { + fn decode>(value: B) -> Result { + let value = value.as_ref(); + let address = Address::from_slice(&value[..20]); + let storage_key = StorageKey::from_slice(&value[20..]); + + Ok(AddressStorageKey((address, storage_key))) + } +} + +impl_fixed_arbitrary!((BlockNumberAddress, 28), (AddressStorageKey, 52)); #[cfg(test)] mod tests { @@ -153,4 +186,29 @@ mod tests { let key = BlockNumberAddress::arbitrary(&mut Unstructured::new(&bytes)).unwrap(); assert_eq!(bytes, Encode::encode(key)); } + + #[test] + fn test_address_storage_key() { + let storage_key = StorageKey::random(); + let address = Address::from_str("ba5e000000000000000000000000000000000000").unwrap(); + let key = AddressStorageKey((address, storage_key)); + + let mut bytes = [0u8; 52]; + bytes[..20].copy_from_slice(address.as_slice()); + bytes[20..].copy_from_slice(storage_key.as_slice()); + + let encoded = Encode::encode(key); + assert_eq!(encoded, bytes); + + let decoded: AddressStorageKey = Decode::decode(encoded).unwrap(); + assert_eq!(decoded, key); + } + + #[test] + fn test_address_storage_key_rand() { + let mut bytes = [0u8; 52]; + thread_rng().fill(bytes.as_mut_slice()); + let key = AddressStorageKey::arbitrary(&mut Unstructured::new(&bytes)).unwrap(); + assert_eq!(bytes, Encode::encode(key)); + } } diff --git a/crates/storage/db/src/tables/utils.rs b/crates/storage/db/src/tables/utils.rs index 331f2905c..753c1f739 100644 --- a/crates/storage/db/src/tables/utils.rs +++ b/crates/storage/db/src/tables/utils.rs @@ -9,34 +9,35 @@ use std::borrow::Cow; #[macro_export] /// Implements the `Arbitrary` trait for types with fixed array types. macro_rules! impl_fixed_arbitrary { - ($name:ident, $size:tt) => { + ($(($name:ident, $size:expr)),*) => { #[cfg(any(test, feature = "arbitrary"))] use arbitrary::{Arbitrary, Unstructured}; + $( + #[cfg(any(test, feature = "arbitrary"))] + impl<'a> Arbitrary<'a> for $name { + fn arbitrary(u: &mut Unstructured<'a>) -> Result { + let mut buffer = vec![0; $size]; + u.fill_buffer(buffer.as_mut_slice())?; - #[cfg(any(test, feature = "arbitrary"))] - impl<'a> Arbitrary<'a> for $name { - fn arbitrary(u: &mut Unstructured<'a>) -> Result { - let mut buffer = vec![0; $size]; - u.fill_buffer(buffer.as_mut_slice())?; - - Decode::decode(buffer).map_err(|_| arbitrary::Error::IncorrectFormat) + Decode::decode(buffer).map_err(|_| arbitrary::Error::IncorrectFormat) + } } - } - #[cfg(any(test, feature = "arbitrary"))] - impl proptest::prelude::Arbitrary for $name { - type Parameters = (); - type Strategy = proptest::strategy::Map< - proptest::collection::VecStrategy<::Strategy>, - fn(Vec) -> Self, - >; + #[cfg(any(test, feature = "arbitrary"))] + impl proptest::prelude::Arbitrary for $name { + type Parameters = (); + type Strategy = proptest::strategy::Map< + proptest::collection::VecStrategy<::Strategy>, + fn(Vec) -> Self, + >; - fn arbitrary_with(args: Self::Parameters) -> Self::Strategy { - use proptest::strategy::Strategy; - proptest::collection::vec(proptest::arbitrary::any_with::(args), $size) - .prop_map(move |vec| Decode::decode(vec).unwrap()) + fn arbitrary_with(args: Self::Parameters) -> Self::Strategy { + use proptest::strategy::Strategy; + proptest::collection::vec(proptest::arbitrary::any_with::(args), $size) + .prop_map(move |vec| Decode::decode(vec).unwrap()) + } } - } + )+ }; }