feat: add ETL to HistoryStages (#7249)

This commit is contained in:
joshieDo
2024-03-26 17:49:08 +01:00
committed by GitHub
parent 96e39d29b9
commit e3d8ceb4be
10 changed files with 576 additions and 189 deletions

View File

@ -10,11 +10,8 @@ use crate::{
};
use clap::Parser;
use itertools::Itertools;
use reth_db::{
database::Database, open_db, static_file::iter_static_files, tables, transaction::DbTxMut,
DatabaseEnv,
};
use reth_node_core::init::{insert_genesis_header, insert_genesis_state};
use reth_db::{open_db, static_file::iter_static_files, tables, transaction::DbTxMut, DatabaseEnv};
use reth_node_core::init::{insert_genesis_header, insert_genesis_history, insert_genesis_state};
use reth_primitives::{
fs, stage::StageId, static_file::find_fixed_range, ChainSpec, StaticFileSegment,
};
@ -91,124 +88,125 @@ impl Command {
}
}
tool.provider_factory.db_ref().update(|tx| {
match self.stage {
StageEnum::Headers => {
tx.clear::<tables::CanonicalHeaders>()?;
tx.clear::<tables::Headers>()?;
tx.clear::<tables::HeaderTerminalDifficulties>()?;
tx.clear::<tables::HeaderNumbers>()?;
tx.put::<tables::StageCheckpoints>(
StageId::Headers.to_string(),
Default::default(),
)?;
insert_genesis_header::<DatabaseEnv>(tx, &static_file_provider, self.chain)?;
}
StageEnum::Bodies => {
tx.clear::<tables::BlockBodyIndices>()?;
tx.clear::<tables::Transactions>()?;
tx.clear::<tables::TransactionBlocks>()?;
tx.clear::<tables::BlockOmmers>()?;
tx.clear::<tables::BlockWithdrawals>()?;
tx.put::<tables::StageCheckpoints>(
StageId::Bodies.to_string(),
Default::default(),
)?;
insert_genesis_header::<DatabaseEnv>(tx, &static_file_provider, self.chain)?;
}
StageEnum::Senders => {
tx.clear::<tables::TransactionSenders>()?;
tx.put::<tables::StageCheckpoints>(
StageId::SenderRecovery.to_string(),
Default::default(),
)?;
}
StageEnum::Execution => {
tx.clear::<tables::PlainAccountState>()?;
tx.clear::<tables::PlainStorageState>()?;
tx.clear::<tables::AccountChangeSets>()?;
tx.clear::<tables::StorageChangeSets>()?;
tx.clear::<tables::Bytecodes>()?;
tx.clear::<tables::Receipts>()?;
tx.put::<tables::StageCheckpoints>(
StageId::Execution.to_string(),
Default::default(),
)?;
insert_genesis_state::<DatabaseEnv>(tx, self.chain.genesis())?;
}
StageEnum::AccountHashing => {
tx.clear::<tables::HashedAccounts>()?;
tx.put::<tables::StageCheckpoints>(
StageId::AccountHashing.to_string(),
Default::default(),
)?;
}
StageEnum::StorageHashing => {
tx.clear::<tables::HashedStorages>()?;
tx.put::<tables::StageCheckpoints>(
StageId::StorageHashing.to_string(),
Default::default(),
)?;
}
StageEnum::Hashing => {
// Clear hashed accounts
tx.clear::<tables::HashedAccounts>()?;
tx.put::<tables::StageCheckpoints>(
StageId::AccountHashing.to_string(),
Default::default(),
)?;
let provider_rw = tool.provider_factory.provider_rw()?;
let tx = provider_rw.tx_ref();
// Clear hashed storages
tx.clear::<tables::HashedStorages>()?;
tx.put::<tables::StageCheckpoints>(
StageId::StorageHashing.to_string(),
Default::default(),
)?;
}
StageEnum::Merkle => {
tx.clear::<tables::AccountsTrie>()?;
tx.clear::<tables::StoragesTrie>()?;
tx.put::<tables::StageCheckpoints>(
StageId::MerkleExecute.to_string(),
Default::default(),
)?;
tx.put::<tables::StageCheckpoints>(
StageId::MerkleUnwind.to_string(),
Default::default(),
)?;
tx.delete::<tables::StageCheckpointProgresses>(
StageId::MerkleExecute.to_string(),
None,
)?;
}
StageEnum::AccountHistory | StageEnum::StorageHistory => {
tx.clear::<tables::AccountsHistory>()?;
tx.clear::<tables::StoragesHistory>()?;
tx.put::<tables::StageCheckpoints>(
StageId::IndexAccountHistory.to_string(),
Default::default(),
)?;
tx.put::<tables::StageCheckpoints>(
StageId::IndexStorageHistory.to_string(),
Default::default(),
)?;
}
StageEnum::TxLookup => {
tx.clear::<tables::TransactionHashNumbers>()?;
tx.put::<tables::StageCheckpoints>(
StageId::TransactionLookup.to_string(),
Default::default(),
)?;
insert_genesis_header::<DatabaseEnv>(tx, &static_file_provider, self.chain)?;
}
match self.stage {
StageEnum::Headers => {
tx.clear::<tables::CanonicalHeaders>()?;
tx.clear::<tables::Headers>()?;
tx.clear::<tables::HeaderTerminalDifficulties>()?;
tx.clear::<tables::HeaderNumbers>()?;
tx.put::<tables::StageCheckpoints>(
StageId::Headers.to_string(),
Default::default(),
)?;
insert_genesis_header::<DatabaseEnv>(tx, &static_file_provider, self.chain)?;
}
StageEnum::Bodies => {
tx.clear::<tables::BlockBodyIndices>()?;
tx.clear::<tables::Transactions>()?;
tx.clear::<tables::TransactionBlocks>()?;
tx.clear::<tables::BlockOmmers>()?;
tx.clear::<tables::BlockWithdrawals>()?;
tx.put::<tables::StageCheckpoints>(
StageId::Bodies.to_string(),
Default::default(),
)?;
insert_genesis_header::<DatabaseEnv>(tx, &static_file_provider, self.chain)?;
}
StageEnum::Senders => {
tx.clear::<tables::TransactionSenders>()?;
tx.put::<tables::StageCheckpoints>(
StageId::SenderRecovery.to_string(),
Default::default(),
)?;
}
StageEnum::Execution => {
tx.clear::<tables::PlainAccountState>()?;
tx.clear::<tables::PlainStorageState>()?;
tx.clear::<tables::AccountChangeSets>()?;
tx.clear::<tables::StorageChangeSets>()?;
tx.clear::<tables::Bytecodes>()?;
tx.clear::<tables::Receipts>()?;
tx.put::<tables::StageCheckpoints>(
StageId::Execution.to_string(),
Default::default(),
)?;
insert_genesis_state::<DatabaseEnv>(tx, self.chain.genesis())?;
}
StageEnum::AccountHashing => {
tx.clear::<tables::HashedAccounts>()?;
tx.put::<tables::StageCheckpoints>(
StageId::AccountHashing.to_string(),
Default::default(),
)?;
}
StageEnum::StorageHashing => {
tx.clear::<tables::HashedStorages>()?;
tx.put::<tables::StageCheckpoints>(
StageId::StorageHashing.to_string(),
Default::default(),
)?;
}
StageEnum::Hashing => {
// Clear hashed accounts
tx.clear::<tables::HashedAccounts>()?;
tx.put::<tables::StageCheckpoints>(
StageId::AccountHashing.to_string(),
Default::default(),
)?;
tx.put::<tables::StageCheckpoints>(StageId::Finish.to_string(), Default::default())?;
// Clear hashed storages
tx.clear::<tables::HashedStorages>()?;
tx.put::<tables::StageCheckpoints>(
StageId::StorageHashing.to_string(),
Default::default(),
)?;
}
StageEnum::Merkle => {
tx.clear::<tables::AccountsTrie>()?;
tx.clear::<tables::StoragesTrie>()?;
tx.put::<tables::StageCheckpoints>(
StageId::MerkleExecute.to_string(),
Default::default(),
)?;
tx.put::<tables::StageCheckpoints>(
StageId::MerkleUnwind.to_string(),
Default::default(),
)?;
tx.delete::<tables::StageCheckpointProgresses>(
StageId::MerkleExecute.to_string(),
None,
)?;
}
StageEnum::AccountHistory | StageEnum::StorageHistory => {
tx.clear::<tables::AccountsHistory>()?;
tx.clear::<tables::StoragesHistory>()?;
tx.put::<tables::StageCheckpoints>(
StageId::IndexAccountHistory.to_string(),
Default::default(),
)?;
tx.put::<tables::StageCheckpoints>(
StageId::IndexStorageHistory.to_string(),
Default::default(),
)?;
insert_genesis_history(&provider_rw, &self.chain.genesis)?;
}
StageEnum::TxLookup => {
tx.clear::<tables::TransactionHashNumbers>()?;
tx.put::<tables::StageCheckpoints>(
StageId::TransactionLookup.to_string(),
Default::default(),
)?;
insert_genesis_header::<DatabaseEnv>(tx, &static_file_provider, self.chain)?;
}
}
static_file_provider.commit()?;
tx.put::<tables::StageCheckpoints>(StageId::Finish.to_string(), Default::default())?;
Ok::<_, eyre::Error>(())
})??;
static_file_provider.commit()?;
provider_rw.commit()?;
Ok(())
}

View File

@ -39,8 +39,6 @@ pub struct Collector<K, V>
where
K: Encode + Ord,
V: Compress,
<K as Encode>::Encoded: std::fmt::Debug,
<V as Compress>::Compressed: std::fmt::Debug,
{
/// Parent directory where to create ETL files
parent_dir: Option<PathBuf>,
@ -62,8 +60,6 @@ impl<K, V> Collector<K, V>
where
K: Key,
V: Value,
<K as Encode>::Encoded: Ord + std::fmt::Debug,
<V as Compress>::Compressed: Ord + std::fmt::Debug,
{
/// Create a new collector with some capacity.
///

View File

@ -875,10 +875,12 @@ impl NodeConfig {
.set(IndexAccountHistoryStage::new(
stage_config.index_account_history.commit_threshold,
prune_modes.account_history,
stage_config.etl.clone(),
))
.set(IndexStorageHistoryStage::new(
stage_config.index_storage_history.commit_threshold,
prune_modes.storage_history,
stage_config.etl.clone(),
)),
)
.build(provider_factory, static_file_producer);

View File

@ -1,18 +1,22 @@
use super::{collect_history_indices, load_history_indices};
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_config::config::EtlConfig;
use reth_db::{
database::Database, models::ShardedKey, table::Decode, tables, transaction::DbTxMut,
};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment,
Address, PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment,
};
use reth_provider::{
AccountExtReader, DatabaseProviderRW, HistoryWriter, PruneCheckpointReader,
PruneCheckpointWriter,
DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
};
use std::fmt::Debug;
use tracing::info;
/// Stage is indexing history the account changesets generated in
/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information
/// on index sharding take a look at [`reth_db::tables::AccountsHistory`]
/// on index sharding take a look at [`tables::AccountsHistory`]
#[derive(Debug)]
pub struct IndexAccountHistoryStage {
/// Number of blocks after which the control
@ -20,18 +24,24 @@ pub struct IndexAccountHistoryStage {
pub commit_threshold: u64,
/// Pruning configuration.
pub prune_mode: Option<PruneMode>,
/// ETL configuration
pub etl_config: EtlConfig,
}
impl IndexAccountHistoryStage {
/// Create new instance of [IndexAccountHistoryStage].
pub fn new(commit_threshold: u64, prune_mode: Option<PruneMode>) -> Self {
Self { commit_threshold, prune_mode }
pub fn new(
commit_threshold: u64,
prune_mode: Option<PruneMode>,
etl_config: EtlConfig,
) -> Self {
Self { commit_threshold, prune_mode, etl_config }
}
}
impl Default for IndexAccountHistoryStage {
fn default() -> Self {
Self { commit_threshold: 100_000, prune_mode: None }
Self { commit_threshold: 100_000, prune_mode: None, etl_config: EtlConfig::default() }
}
}
@ -81,13 +91,37 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let mut range = input.next_block_range();
let first_sync = input.checkpoint().block_number == 0;
let indices = provider.changed_accounts_and_blocks_with_range(range.clone())?;
// Insert changeset to history index
provider.insert_account_history_index(indices)?;
// On first sync we might have history coming from genesis. We clear the table since it's
// faster to rebuild from scratch.
if first_sync {
provider.tx_ref().clear::<tables::AccountsHistory>()?;
range = 0..=*input.next_block_range().end();
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range })
info!(target: "sync::stages::index_account_history::exec", ?first_sync, "Collecting indices");
let collector =
collect_history_indices::<_, tables::AccountChangeSets, tables::AccountsHistory, _>(
provider.tx_ref(),
range.clone(),
ShardedKey::new,
|(index, value)| (index, value.address),
&self.etl_config,
)?;
info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
load_history_indices::<_, tables::AccountsHistory, _>(
provider.tx_ref(),
collector,
first_sync,
ShardedKey::new,
ShardedKey::<Address>::decode,
|key| key.key,
)?;
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}
/// Unwind the stage.
@ -117,18 +151,17 @@ mod tests {
use reth_db::{
cursor::DbCursorRO,
models::{
sharded_key, sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey,
sharded_key, sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx,
StoredBlockBodyIndices,
},
tables,
transaction::{DbTx, DbTxMut},
transaction::DbTx,
BlockNumberList,
};
use reth_interfaces::test_utils::{
generators,
generators::{random_block_range, random_changeset_range, random_contract_account_range},
};
use reth_primitives::{address, Address, BlockNumber, B256};
use reth_primitives::{address, BlockNumber, B256};
use reth_provider::providers::StaticFileWriter;
use std::collections::BTreeMap;
@ -205,7 +238,7 @@ mod tests {
}
#[tokio::test]
async fn insert_index_to_empty() {
async fn insert_index_to_genesis() {
// init
let db = TestStageDB::default();
@ -217,14 +250,14 @@ mod tests {
// verify
let table = cast(db.table::<tables::AccountsHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3])]));
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0, 1, 2, 3])]));
// unwind
unwind(&db, 3, 0);
// verify initial state
let table = db.table::<tables::AccountsHistory>().unwrap();
assert!(table.is_empty());
let table = cast(db.table::<tables::AccountsHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0])]));
}
#[tokio::test]
@ -484,7 +517,11 @@ mod tests {
}
fn stage(&self) -> Self::S {
Self::S { commit_threshold: self.commit_threshold, prune_mode: self.prune_mode }
Self::S {
commit_threshold: self.commit_threshold,
prune_mode: self.prune_mode,
etl_config: EtlConfig::default(),
}
}
}

View File

@ -1,17 +1,26 @@
use super::{collect_history_indices, load_history_indices};
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::{database::Database, models::BlockNumberAddress};
use reth_config::config::EtlConfig;
use reth_db::{
database::Database,
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
table::Decode,
tables,
transaction::DbTxMut,
};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment,
};
use reth_provider::{
DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageReader,
DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
};
use std::fmt::Debug;
use tracing::info;
/// Stage is indexing history the account changesets generated in
/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information
/// on index sharding take a look at [`reth_db::tables::StoragesHistory`].
/// on index sharding take a look at [`tables::StoragesHistory`].
#[derive(Debug)]
pub struct IndexStorageHistoryStage {
/// Number of blocks after which the control
@ -19,18 +28,24 @@ pub struct IndexStorageHistoryStage {
pub commit_threshold: u64,
/// Pruning configuration.
pub prune_mode: Option<PruneMode>,
/// ETL configuration
pub etl_config: EtlConfig,
}
impl IndexStorageHistoryStage {
/// Create new instance of [IndexStorageHistoryStage].
pub fn new(commit_threshold: u64, prune_mode: Option<PruneMode>) -> Self {
Self { commit_threshold, prune_mode }
pub fn new(
commit_threshold: u64,
prune_mode: Option<PruneMode>,
etl_config: EtlConfig,
) -> Self {
Self { commit_threshold, prune_mode, etl_config }
}
}
impl Default for IndexStorageHistoryStage {
fn default() -> Self {
Self { commit_threshold: 100_000, prune_mode: None }
Self { commit_threshold: 100_000, prune_mode: None, etl_config: EtlConfig::default() }
}
}
@ -80,12 +95,41 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let mut range = input.next_block_range();
let first_sync = input.checkpoint().block_number == 0;
let indices = provider.changed_storages_and_blocks_with_range(range.clone())?;
provider.insert_storage_history_index(indices)?;
// On first sync we might have history coming from genesis. We clear the table since it's
// faster to rebuild from scratch.
if first_sync {
provider.tx_ref().clear::<tables::StoragesHistory>()?;
range = 0..=*input.next_block_range().end();
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range })
info!(target: "sync::stages::index_storage_history::exec", ?first_sync, "Collecting indices");
let collector =
collect_history_indices::<_, tables::StorageChangeSets, tables::StoragesHistory, _>(
provider.tx_ref(),
BlockNumberAddress::range(range.clone()),
|AddressStorageKey((address, storage_key)), highest_block_number| {
StorageShardedKey::new(address, storage_key, highest_block_number)
},
|(key, value)| (key.block_number(), AddressStorageKey((key.address(), value.key))),
&self.etl_config,
)?;
info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database");
load_history_indices::<_, tables::StoragesHistory, _>(
provider.tx_ref(),
collector,
first_sync,
|AddressStorageKey((address, storage_key)), highest_block_number| {
StorageShardedKey::new(address, storage_key, highest_block_number)
},
StorageShardedKey::decode,
|key| AddressStorageKey((key.address, key.sharded_key.key)),
)?;
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}
/// Unwind the stage.
@ -114,12 +158,10 @@ mod tests {
use reth_db::{
cursor::DbCursorRO,
models::{
sharded_key,
storage_sharded_key::{StorageShardedKey, NUM_OF_INDICES_IN_SHARD},
ShardedKey, StoredBlockBodyIndices,
sharded_key, storage_sharded_key::NUM_OF_INDICES_IN_SHARD, ShardedKey,
StoredBlockBodyIndices,
},
tables,
transaction::{DbTx, DbTxMut},
transaction::DbTx,
BlockNumberList,
};
use reth_interfaces::test_utils::{
@ -216,7 +258,7 @@ mod tests {
}
#[tokio::test]
async fn insert_index_to_empty() {
async fn insert_index_to_genesis() {
// init
let db = TestStageDB::default();
@ -228,14 +270,14 @@ mod tests {
// verify
let table = cast(db.table::<tables::StoragesHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3])]));
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0, 1, 2, 3])]));
// unwind
unwind(&db, 5, 0);
// verify initial state
let table = db.table::<tables::StoragesHistory>().unwrap();
assert!(table.is_empty());
let table = cast(db.table::<tables::StoragesHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0])]));
}
#[tokio::test]
@ -498,7 +540,11 @@ mod tests {
}
fn stage(&self) -> Self::S {
Self::S { commit_threshold: self.commit_threshold, prune_mode: self.prune_mode }
Self::S {
commit_threshold: self.commit_threshold,
prune_mode: self.prune_mode,
etl_config: EtlConfig::default(),
}
}
}

View File

@ -30,9 +30,13 @@ pub use headers::*;
pub use index_account_history::*;
pub use index_storage_history::*;
pub use merkle::*;
pub use sender_recovery::*;
pub use tx_lookup::*;
mod utils;
use utils::*;
#[cfg(test)]
mod tests {
use super::*;

View File

@ -0,0 +1,244 @@
//! Utils for `stages`.
use crate::StageError;
use reth_config::config::EtlConfig;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
models::sharded_key::NUM_OF_INDICES_IN_SHARD,
table::{Decompress, Table},
transaction::{DbTx, DbTxMut},
BlockNumberList, DatabaseError,
};
use reth_etl::Collector;
use reth_primitives::BlockNumber;
use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
use tracing::info;
/// Number of blocks before pushing indices from cache to [`Collector`]
const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
/// Collects all history (`H`) indices for a range of changesets (`CS`) and stores them in a
/// [`Collector`].
///
/// ## Process
/// The function utilizes a `HashMap` cache with a structure of `PartialKey` (`P`) (Address or
/// Address.StorageKey) to `BlockNumberList`. When the cache exceeds its capacity, its contents are
/// moved to a [`Collector`]. Here, each entry's key is a concatenation of `PartialKey` and the
/// highest block number in its list.
///
/// ## Example
/// 1. Initial Cache State: `{ Address1: [1,2,3], ... }`
/// 2. Cache is flushed to the `Collector`.
/// 3. Updated Cache State: `{ Address1: [100,300], ... }`
/// 4. Cache is flushed again.
///
/// As a result, the `Collector` will contain entries such as `(Address1.3, [1,2,3])` and
/// `(Address1.300, [100,300])`. The entries may be stored across one or more files.
pub(crate) fn collect_history_indices<TX, CS, H, P>(
tx: &TX,
range: impl RangeBounds<CS::Key>,
sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key,
partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P),
etl_config: &EtlConfig,
) -> Result<Collector<H::Key, H::Value>, StageError>
where
TX: DbTxMut + DbTx,
CS: Table,
H: Table<Value = BlockNumberList>,
P: Copy + Eq + Hash,
{
let mut changeset_cursor = tx.cursor_read::<CS>()?;
let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
let mut cache: HashMap<P, Vec<u64>> = HashMap::new();
let mut collect = |cache: &HashMap<P, Vec<u64>>| {
for (key, indice_list) in cache {
let last = indice_list.last().expect("qed");
collector.insert(
sharded_key_factory(*key, *last),
BlockNumberList::new_pre_sorted(indice_list),
)?;
}
Ok::<(), StageError>(())
};
// observability
let total_changesets = tx.entries::<CS>()?;
let interval = (total_changesets / 100).max(1);
let mut flush_counter = 0;
let mut current_block_number = u64::MAX;
for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
let (block_number, key) = partial_key_factory(entry?);
cache.entry(key).or_default().push(block_number);
if idx > 0 && idx % interval == 0 && total_changesets > 100 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
}
// Make sure we only flush the cache every DEFAULT_CACHE_THRESHOLD blocks.
if current_block_number != block_number {
current_block_number = block_number;
flush_counter += 1;
if flush_counter > DEFAULT_CACHE_THRESHOLD {
collect(&cache)?;
cache.clear();
flush_counter = 0;
}
}
}
collect(&cache)?;
Ok(collector)
}
/// Given a [`Collector`] created by [`collect_history_indices`] it iterates all entries, loading
/// the indices into the database in shards.
///
/// ## Process
/// Iterates over elements, grouping indices by their partial keys (e.g., `Address` or
/// `Address.StorageKey`). It flushes indices to disk when reaching a shard's max length
/// (`NUM_OF_INDICES_IN_SHARD`) or when the partial key changes, ensuring the last previous partial
/// key shard is stored.
pub(crate) fn load_history_indices<TX, H, P>(
tx: &TX,
mut collector: Collector<H::Key, H::Value>,
append_only: bool,
sharded_key_factory: impl Clone + Fn(P, u64) -> <H as Table>::Key,
decode_key: impl Fn(Vec<u8>) -> Result<<H as Table>::Key, DatabaseError>,
get_partial: impl Fn(<H as Table>::Key) -> P,
) -> Result<(), StageError>
where
TX: DbTxMut + DbTx,
H: Table<Value = BlockNumberList>,
P: Copy + Default + Eq,
{
let mut write_cursor = tx.cursor_write::<H>()?;
let mut current_partial = P::default();
let mut current_list = Vec::<u64>::new();
// observability
let total_entries = collector.len();
let interval = (total_entries / 100).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = decode_key(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index % interval == 0 && total_entries > 100 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
}
// AccountsHistory: `Address`.
// StorageHistory: `Address.StorageKey`.
let partial_key = get_partial(sharded_key);
if current_partial != partial_key {
// We have reached the end of this subset of keys so
// we need to flush its last indice shard.
load_indices(
&mut write_cursor,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;
current_partial = partial_key;
current_list.clear();
// If it's not the first sync, there might an existing shard already, so we need to
// merge it with the one coming from the collector
if !append_only {
if let Some((_, last_database_shard)) =
write_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))?
{
current_list.extend(last_database_shard.iter());
}
}
}
current_list.extend(new_list.iter());
load_indices(
&mut write_cursor,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::KeepLast,
)?;
}
// There will be one remaining shard that needs to be flushed to DB.
load_indices(
&mut write_cursor,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;
Ok(())
}
/// Shard and insert the indice list according to [`LoadMode`] and its length.
pub(crate) fn load_indices<H, C, P>(
cursor: &mut C,
partial_key: P,
list: &mut Vec<BlockNumber>,
sharded_key_factory: &impl Fn(P, BlockNumber) -> <H as Table>::Key,
append_only: bool,
mode: LoadMode,
) -> Result<(), StageError>
where
C: DbCursorRO<H> + DbCursorRW<H>,
H: Table<Value = BlockNumberList>,
P: Copy,
{
if list.len() > NUM_OF_INDICES_IN_SHARD || mode.is_flush() {
let chunks = list
.chunks(NUM_OF_INDICES_IN_SHARD)
.map(|chunks| chunks.to_vec())
.collect::<Vec<Vec<u64>>>();
let mut iter = chunks.into_iter().peekable();
while let Some(chunk) = iter.next() {
let mut highest = *chunk.last().expect("at least one index");
if !mode.is_flush() && iter.peek().is_none() {
*list = chunk;
} else {
if iter.peek().is_none() {
highest = u64::MAX;
}
let key = sharded_key_factory(partial_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk);
if append_only {
cursor.append(key, value)?;
} else {
cursor.upsert(key, value)?;
}
}
}
}
Ok(())
}
/// Mode on how to load index shards into the database.
pub(crate) enum LoadMode {
/// Keep the last shard in memory and don't flush it to the database.
KeepLast,
/// Flush all shards into the database.
Flush,
}
impl LoadMode {
fn is_flush(&self) -> bool {
matches!(self, Self::Flush)
}
}

View File

@ -16,7 +16,8 @@ pub trait Compress: Send + Sync + Sized + Debug {
+ Into<Vec<u8>>
+ Default
+ Send
+ Sync;
+ Sync
+ Debug;
/// If the type cannot be compressed, return its inner reference as `Some(self.as_ref())`
fn uncompressable_ref(&self) -> Option<&[u8]> {
@ -48,7 +49,7 @@ pub trait Decompress: Send + Sync + Sized + Debug {
/// Trait that will transform the data to be saved in the DB.
pub trait Encode: Send + Sync + Sized + Debug {
/// Encoded type.
type Encoded: AsRef<[u8]> + Into<Vec<u8>> + Send + Sync;
type Encoded: AsRef<[u8]> + Into<Vec<u8>> + Send + Sync + Ord + Debug;
/// Encodes data going into the database.
fn encode(self) -> Self::Encoded;

View File

@ -8,7 +8,7 @@ use crate::{
DatabaseError,
};
use reth_codecs::{derive_arbitrary, Compact};
use reth_primitives::{Account, Address, BlockNumber, Buf};
use reth_primitives::{Account, Address, BlockNumber, Buf, StorageKey};
use serde::{Deserialize, Serialize};
/// Account as it is saved inside [`AccountChangeSets`][crate::tables::AccountChangeSets].
@ -121,7 +121,40 @@ impl Decode for BlockNumberAddress {
}
}
impl_fixed_arbitrary!(BlockNumberAddress, 28);
/// [`Address`] concatenated with [`StorageKey`]. Used by `reth_etl` and history stages.
///
/// Since it's used as a key, it isn't compressed when encoding it.
#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Ord, PartialOrd, Hash,
)]
pub struct AddressStorageKey(pub (Address, StorageKey));
impl Encode for AddressStorageKey {
type Encoded = [u8; 52];
fn encode(self) -> Self::Encoded {
let address = self.0 .0;
let storage_key = self.0 .1;
let mut buf = [0u8; 52];
buf[..20].copy_from_slice(address.as_slice());
buf[20..].copy_from_slice(storage_key.as_slice());
buf
}
}
impl Decode for AddressStorageKey {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
let value = value.as_ref();
let address = Address::from_slice(&value[..20]);
let storage_key = StorageKey::from_slice(&value[20..]);
Ok(AddressStorageKey((address, storage_key)))
}
}
impl_fixed_arbitrary!((BlockNumberAddress, 28), (AddressStorageKey, 52));
#[cfg(test)]
mod tests {
@ -153,4 +186,29 @@ mod tests {
let key = BlockNumberAddress::arbitrary(&mut Unstructured::new(&bytes)).unwrap();
assert_eq!(bytes, Encode::encode(key));
}
#[test]
fn test_address_storage_key() {
let storage_key = StorageKey::random();
let address = Address::from_str("ba5e000000000000000000000000000000000000").unwrap();
let key = AddressStorageKey((address, storage_key));
let mut bytes = [0u8; 52];
bytes[..20].copy_from_slice(address.as_slice());
bytes[20..].copy_from_slice(storage_key.as_slice());
let encoded = Encode::encode(key);
assert_eq!(encoded, bytes);
let decoded: AddressStorageKey = Decode::decode(encoded).unwrap();
assert_eq!(decoded, key);
}
#[test]
fn test_address_storage_key_rand() {
let mut bytes = [0u8; 52];
thread_rng().fill(bytes.as_mut_slice());
let key = AddressStorageKey::arbitrary(&mut Unstructured::new(&bytes)).unwrap();
assert_eq!(bytes, Encode::encode(key));
}
}

View File

@ -9,34 +9,35 @@ use std::borrow::Cow;
#[macro_export]
/// Implements the `Arbitrary` trait for types with fixed array types.
macro_rules! impl_fixed_arbitrary {
($name:ident, $size:tt) => {
($(($name:ident, $size:expr)),*) => {
#[cfg(any(test, feature = "arbitrary"))]
use arbitrary::{Arbitrary, Unstructured};
$(
#[cfg(any(test, feature = "arbitrary"))]
impl<'a> Arbitrary<'a> for $name {
fn arbitrary(u: &mut Unstructured<'a>) -> Result<Self, arbitrary::Error> {
let mut buffer = vec![0; $size];
u.fill_buffer(buffer.as_mut_slice())?;
#[cfg(any(test, feature = "arbitrary"))]
impl<'a> Arbitrary<'a> for $name {
fn arbitrary(u: &mut Unstructured<'a>) -> Result<Self, arbitrary::Error> {
let mut buffer = vec![0; $size];
u.fill_buffer(buffer.as_mut_slice())?;
Decode::decode(buffer).map_err(|_| arbitrary::Error::IncorrectFormat)
Decode::decode(buffer).map_err(|_| arbitrary::Error::IncorrectFormat)
}
}
}
#[cfg(any(test, feature = "arbitrary"))]
impl proptest::prelude::Arbitrary for $name {
type Parameters = ();
type Strategy = proptest::strategy::Map<
proptest::collection::VecStrategy<<u8 as proptest::arbitrary::Arbitrary>::Strategy>,
fn(Vec<u8>) -> Self,
>;
#[cfg(any(test, feature = "arbitrary"))]
impl proptest::prelude::Arbitrary for $name {
type Parameters = ();
type Strategy = proptest::strategy::Map<
proptest::collection::VecStrategy<<u8 as proptest::arbitrary::Arbitrary>::Strategy>,
fn(Vec<u8>) -> Self,
>;
fn arbitrary_with(args: Self::Parameters) -> Self::Strategy {
use proptest::strategy::Strategy;
proptest::collection::vec(proptest::arbitrary::any_with::<u8>(args), $size)
.prop_map(move |vec| Decode::decode(vec).unwrap())
fn arbitrary_with(args: Self::Parameters) -> Self::Strategy {
use proptest::strategy::Strategy;
proptest::collection::vec(proptest::arbitrary::any_with::<u8>(args), $size)
.prop_map(move |vec| Decode::decode(vec).unwrap())
}
}
}
)+
};
}