From 0116b80414170671740b67c30d42b6dcd70df75d Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Thu, 26 Oct 2023 13:01:29 +0100 Subject: [PATCH] feat: add `reth db snapshot transactions | receipts` commands (#5007) Co-authored-by: Alexey Shekhirin --- Cargo.lock | 2 + bin/reth/src/db/snapshots/bench.rs | 26 ++- bin/reth/src/db/snapshots/headers.rs | 30 ++- bin/reth/src/db/snapshots/mod.rs | 82 ++++---- bin/reth/src/db/snapshots/receipts.rs | 176 +++++++++++++++++ bin/reth/src/db/snapshots/transactions.rs | 179 ++++++++++++++++++ crates/interfaces/src/provider.rs | 10 +- crates/primitives/src/lib.rs | 2 +- crates/primitives/src/snapshot/mod.rs | 2 +- crates/primitives/src/snapshot/segment.rs | 29 +++ crates/primitives/src/transaction/mod.rs | 5 +- crates/snapshot/src/segments/headers.rs | 47 ++--- crates/snapshot/src/segments/mod.rs | 52 +++-- crates/snapshot/src/segments/receipts.rs | 74 ++++++++ crates/snapshot/src/segments/transactions.rs | 74 ++++++++ crates/stages/src/stages/tx_lookup.rs | 56 +----- crates/storage/db/Cargo.toml | 2 + crates/storage/db/src/snapshot.rs | 6 +- crates/storage/provider/src/lib.rs | 2 +- .../src/providers/database/provider.rs | 66 ++++++- .../provider/src/providers/snapshot.rs | 145 ++++++++++++-- crates/storage/provider/src/traits/mod.rs | 2 +- .../provider/src/traits/transactions.rs | 34 +++- 23 files changed, 924 insertions(+), 179 deletions(-) create mode 100644 bin/reth/src/db/snapshots/receipts.rs create mode 100644 bin/reth/src/db/snapshots/transactions.rs create mode 100644 crates/snapshot/src/segments/receipts.rs create mode 100644 crates/snapshot/src/segments/transactions.rs diff --git a/Cargo.lock b/Cargo.lock index a0c2d0672..f7a6f9d39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5833,6 +5833,7 @@ dependencies = [ "futures", "heapless", "iai", + "itertools 0.11.0", "metrics", "modular-bitfield", "page_size", @@ -5844,6 +5845,7 @@ dependencies = [ "proptest", "proptest-derive", "rand 0.8.5", + "rayon", "reth-codecs", "reth-interfaces", "reth-libmdbx", diff --git a/bin/reth/src/db/snapshots/bench.rs b/bin/reth/src/db/snapshots/bench.rs index edcfe6fa5..47c5ec2fa 100644 --- a/bin/reth/src/db/snapshots/bench.rs +++ b/bin/reth/src/db/snapshots/bench.rs @@ -4,7 +4,7 @@ use reth_primitives::{ ChainSpec, SnapshotSegment, }; use reth_provider::{DatabaseProviderRO, ProviderFactory}; -use std::{sync::Arc, time::Instant}; +use std::{fmt::Debug, sync::Arc, time::Instant}; #[derive(Debug)] pub(crate) enum BenchKind { @@ -14,7 +14,7 @@ pub(crate) enum BenchKind { RandomHash, } -pub(crate) fn bench( +pub(crate) fn bench( bench_kind: BenchKind, db: (DatabaseEnvRO, Arc), segment: SnapshotSegment, @@ -24,28 +24,34 @@ pub(crate) fn bench( database_method: F2, ) -> eyre::Result<()> where - F1: FnMut() -> eyre::Result<()>, - F2: Fn(DatabaseProviderRO<'_, DatabaseEnvRO>) -> eyre::Result<()>, + F1: FnMut() -> eyre::Result, + F2: Fn(DatabaseProviderRO<'_, DatabaseEnvRO>) -> eyre::Result, + R: Debug + PartialEq, { let (db, chain) = db; println!(); println!("############"); println!("## [{segment:?}] [{compression:?}] [{filters:?}] [{bench_kind:?}]"); - { + let snap_result = { let start = Instant::now(); - snapshot_method()?; + let result = snapshot_method()?; let end = start.elapsed().as_micros(); println!("# snapshot {bench_kind:?} | {end} μs"); - } - { + result + }; + + let db_result = { let factory = ProviderFactory::new(db, chain); let provider = factory.provider()?; let start = Instant::now(); - database_method(provider)?; + let result = database_method(provider)?; let end = start.elapsed().as_micros(); println!("# database {bench_kind:?} | {end} μs"); - } + result + }; + + assert_eq!(snap_result, db_result); Ok(()) } diff --git a/bin/reth/src/db/snapshots/headers.rs b/bin/reth/src/db/snapshots/headers.rs index 4fc60f3cf..7a9e81356 100644 --- a/bin/reth/src/db/snapshots/headers.rs +++ b/bin/reth/src/db/snapshots/headers.rs @@ -2,23 +2,22 @@ use super::{ bench::{bench, BenchKind}, Command, }; -use crate::utils::DbTool; use rand::{seq::SliceRandom, Rng}; -use reth_db::{database::Database, open_db_read_only, table::Decompress, DatabaseEnvRO}; +use reth_db::{database::Database, open_db_read_only, table::Decompress}; use reth_interfaces::db::LogLevel; use reth_nippy_jar::NippyJar; use reth_primitives::{ snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction}, ChainSpec, Header, SnapshotSegment, }; -use reth_provider::{HeaderProvider, ProviderError, ProviderFactory}; +use reth_provider::{DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory}; use reth_snapshot::segments::{get_snapshot_segment_file_name, Headers, Segment}; use std::{path::Path, sync::Arc}; impl Command { - pub(crate) fn generate_headers_snapshot( + pub(crate) fn generate_headers_snapshot( &self, - tool: &DbTool<'_, DatabaseEnvRO>, + provider: &DatabaseProviderRO<'_, DB>, compression: Compression, inclusion_filter: InclusionFilter, phf: PerfectHashingFunction, @@ -31,7 +30,7 @@ impl Command { Filters::WithoutFilters }, ); - segment.snapshot(&tool.db.tx()?, self.from..=(self.from + self.block_interval - 1))?; + segment.snapshot::(provider, self.from..=(self.from + self.block_interval - 1))?; Ok(()) } @@ -56,7 +55,7 @@ impl Command { let mut row_indexes = range.clone().collect::>(); let mut rng = rand::thread_rng(); let mut dictionaries = None; - let mut jar = NippyJar::load_without_header(&get_snapshot_segment_file_name( + let mut jar = NippyJar::load(&get_snapshot_segment_file_name( SnapshotSegment::Headers, filters, compression, @@ -114,18 +113,16 @@ impl Command { filters, compression, || { - Header::decompress( + Ok(Header::decompress( cursor .row_by_number_with_cols::<0b01, 2>((num - self.from) as usize)? .ok_or(ProviderError::HeaderNotFound((num as u64).into()))?[0], - )?; - Ok(()) + )?) }, |provider| { - provider + Ok(provider .header_by_number(num as u64)? - .ok_or(ProviderError::HeaderNotFound((num as u64).into()))?; - Ok(()) + .ok_or(ProviderError::HeaderNotFound((num as u64).into()))?) }, )?; } @@ -154,13 +151,12 @@ impl Command { // Might be a false positive, so in the real world we have to validate it assert_eq!(header.hash_slow(), header_hash); - Ok(()) + Ok(header) }, |provider| { - provider + Ok(provider .header(&header_hash)? - .ok_or(ProviderError::HeaderNotFound(header_hash.into()))?; - Ok(()) + .ok_or(ProviderError::HeaderNotFound(header_hash.into()))?) }, )?; } diff --git a/bin/reth/src/db/snapshots/mod.rs b/bin/reth/src/db/snapshots/mod.rs index afc2b0ce8..1e663321e 100644 --- a/bin/reth/src/db/snapshots/mod.rs +++ b/bin/reth/src/db/snapshots/mod.rs @@ -1,44 +1,26 @@ -use crate::{db::genesis_value_parser, utils::DbTool}; use clap::Parser; use itertools::Itertools; -use reth_db::open_db_read_only; +use reth_db::{open_db_read_only, DatabaseEnvRO}; use reth_interfaces::db::LogLevel; use reth_nippy_jar::{ compression::{DecoderDictionary, Decompressor}, NippyJar, }; use reth_primitives::{ - snapshot::{Compression, InclusionFilter, PerfectHashingFunction}, + snapshot::{Compression, InclusionFilter, PerfectHashingFunction, SegmentHeader}, BlockNumber, ChainSpec, SnapshotSegment, }; -use reth_provider::providers::SnapshotProvider; +use reth_provider::{providers::SnapshotProvider, ProviderFactory}; use std::{path::Path, sync::Arc}; mod bench; mod headers; +mod receipts; +mod transactions; #[derive(Parser, Debug)] /// Arguments for the `reth db snapshot` command. pub struct Command { - /// The chain this node is running. - /// - /// Possible values are either a built-in chain or the path to a chain specification file. - /// - /// Built-in chains: - /// - mainnet - /// - goerli - /// - sepolia - /// - holesky - #[arg( - long, - value_name = "CHAIN_OR_PATH", - verbatim_doc_comment, - default_value = "mainnet", - value_parser = genesis_value_parser, - global = true, - )] - chain: Arc, - /// Snapshot segments to generate. segments: Vec, @@ -87,19 +69,33 @@ impl Command { { let db = open_db_read_only(db_path, None)?; - let tool = DbTool::new(&db, chain.clone())?; + let factory = ProviderFactory::new(db, chain.clone()); + let provider = factory.provider()?; if !self.only_bench { for ((mode, compression), phf) in all_combinations.clone() { match mode { - SnapshotSegment::Headers => self.generate_headers_snapshot( - &tool, - *compression, - InclusionFilter::Cuckoo, - *phf, - )?, - SnapshotSegment::Transactions => todo!(), - SnapshotSegment::Receipts => todo!(), + SnapshotSegment::Headers => self + .generate_headers_snapshot::( + &provider, + *compression, + InclusionFilter::Cuckoo, + *phf, + )?, + SnapshotSegment::Transactions => self + .generate_transactions_snapshot::( + &provider, + *compression, + InclusionFilter::Cuckoo, + *phf, + )?, + SnapshotSegment::Receipts => self + .generate_receipts_snapshot::( + &provider, + *compression, + InclusionFilter::Cuckoo, + *phf, + )?, } } } @@ -116,8 +112,22 @@ impl Command { InclusionFilter::Cuckoo, *phf, )?, - SnapshotSegment::Transactions => todo!(), - SnapshotSegment::Receipts => todo!(), + SnapshotSegment::Transactions => self.bench_transactions_snapshot( + db_path, + log_level, + chain.clone(), + *compression, + InclusionFilter::Cuckoo, + *phf, + )?, + SnapshotSegment::Receipts => self.bench_receipts_snapshot( + db_path, + log_level, + chain.clone(), + *compression, + InclusionFilter::Cuckoo, + *phf, + )?, } } } @@ -129,7 +139,7 @@ impl Command { /// [`DecoderDictionary`] and [`Decompressor`] if necessary. fn prepare_jar_provider<'a>( &self, - jar: &'a mut NippyJar, + jar: &'a mut NippyJar, dictionaries: &'a mut Option>>, ) -> eyre::Result<(SnapshotProvider<'a>, Vec>)> { let mut decompressors: Vec> = vec![]; @@ -140,6 +150,6 @@ impl Command { } } - Ok((SnapshotProvider { jar: &*jar, jar_start_block: self.from }, decompressors)) + Ok((SnapshotProvider { jar: &*jar }, decompressors)) } } diff --git a/bin/reth/src/db/snapshots/receipts.rs b/bin/reth/src/db/snapshots/receipts.rs new file mode 100644 index 000000000..b6b472170 --- /dev/null +++ b/bin/reth/src/db/snapshots/receipts.rs @@ -0,0 +1,176 @@ +use super::{ + bench::{bench, BenchKind}, + Command, Compression, PerfectHashingFunction, +}; +use rand::{seq::SliceRandom, Rng}; +use reth_db::{database::Database, open_db_read_only, table::Decompress}; +use reth_interfaces::db::LogLevel; +use reth_nippy_jar::NippyJar; +use reth_primitives::{ + snapshot::{Filters, InclusionFilter}, + ChainSpec, Receipt, SnapshotSegment, +}; +use reth_provider::{ + DatabaseProviderRO, ProviderError, ProviderFactory, ReceiptProvider, TransactionsProvider, + TransactionsProviderExt, +}; +use reth_snapshot::{ + segments, + segments::{get_snapshot_segment_file_name, Segment}, +}; +use std::{path::Path, sync::Arc}; + +impl Command { + pub(crate) fn generate_receipts_snapshot( + &self, + provider: &DatabaseProviderRO<'_, DB>, + compression: Compression, + inclusion_filter: InclusionFilter, + phf: PerfectHashingFunction, + ) -> eyre::Result<()> { + let segment = segments::Receipts::new( + compression, + if self.with_filters { + Filters::WithFilters(inclusion_filter, phf) + } else { + Filters::WithoutFilters + }, + ); + segment.snapshot::(provider, self.from..=(self.from + self.block_interval - 1))?; + + Ok(()) + } + + pub(crate) fn bench_receipts_snapshot( + &self, + db_path: &Path, + log_level: Option, + chain: Arc, + compression: Compression, + inclusion_filter: InclusionFilter, + phf: PerfectHashingFunction, + ) -> eyre::Result<()> { + let filters = if self.with_filters { + Filters::WithFilters(inclusion_filter, phf) + } else { + Filters::WithoutFilters + }; + + let block_range = self.from..=(self.from + self.block_interval - 1); + + let mut rng = rand::thread_rng(); + let mut dictionaries = None; + let mut jar = NippyJar::load(&get_snapshot_segment_file_name( + SnapshotSegment::Receipts, + filters, + compression, + &block_range, + ))?; + + let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone()) + .provider()? + .transaction_range_by_block_range(block_range)?; + + let mut row_indexes = tx_range.clone().collect::>(); + + let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?; + let mut cursor = if !decompressors.is_empty() { + provider.cursor_with_decompressors(decompressors) + } else { + provider.cursor() + }; + + for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] { + bench( + bench_kind, + (open_db_read_only(db_path, log_level)?, chain.clone()), + SnapshotSegment::Receipts, + filters, + compression, + || { + for num in row_indexes.iter() { + Receipt::decompress( + cursor + .row_by_number_with_cols::<0b1, 1>( + (num - tx_range.start()) as usize, + )? + .ok_or(ProviderError::ReceiptNotFound((*num).into()))?[0], + )?; + // TODO: replace with below when eventually SnapshotProvider re-uses cursor + // provider.receipt(num as + // u64)?.ok_or(ProviderError::ReceiptNotFound((*num).into()))?; + } + Ok(()) + }, + |provider| { + for num in row_indexes.iter() { + provider + .receipt(*num)? + .ok_or(ProviderError::ReceiptNotFound((*num).into()))?; + } + Ok(()) + }, + )?; + + // For random walk + row_indexes.shuffle(&mut rng); + } + + // BENCHMARK QUERYING A RANDOM RECEIPT BY NUMBER + { + let num = row_indexes[rng.gen_range(0..row_indexes.len())]; + bench( + BenchKind::RandomOne, + (open_db_read_only(db_path, log_level)?, chain.clone()), + SnapshotSegment::Receipts, + filters, + compression, + || { + Ok(Receipt::decompress( + cursor + .row_by_number_with_cols::<0b1, 1>((num - tx_range.start()) as usize)? + .ok_or(ProviderError::ReceiptNotFound((num as u64).into()))?[0], + )?) + }, + |provider| { + Ok(provider + .receipt(num as u64)? + .ok_or(ProviderError::ReceiptNotFound((num as u64).into()))?) + }, + )?; + } + + // BENCHMARK QUERYING A RANDOM RECEIPT BY HASH + { + let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64; + let tx_hash = + ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone()) + .transaction_by_id(num)? + .ok_or(ProviderError::ReceiptNotFound(num.into()))? + .hash(); + + bench( + BenchKind::RandomHash, + (open_db_read_only(db_path, log_level)?, chain.clone()), + SnapshotSegment::Receipts, + filters, + compression, + || { + let receipt = Receipt::decompress( + cursor + .row_by_key_with_cols::<0b1, 1>(tx_hash.as_slice())? + .ok_or(ProviderError::ReceiptNotFound(tx_hash.into()))?[0], + )?; + + Ok(receipt) + }, + |provider| { + Ok(provider + .receipt_by_hash(tx_hash)? + .ok_or(ProviderError::ReceiptNotFound(tx_hash.into()))?) + }, + )?; + } + Ok(()) + } +} diff --git a/bin/reth/src/db/snapshots/transactions.rs b/bin/reth/src/db/snapshots/transactions.rs new file mode 100644 index 000000000..8c4544386 --- /dev/null +++ b/bin/reth/src/db/snapshots/transactions.rs @@ -0,0 +1,179 @@ +use super::{ + bench::{bench, BenchKind}, + Command, Compression, PerfectHashingFunction, +}; +use rand::{seq::SliceRandom, Rng}; +use reth_db::{database::Database, open_db_read_only, table::Decompress}; +use reth_interfaces::db::LogLevel; +use reth_nippy_jar::NippyJar; +use reth_primitives::{ + snapshot::{Filters, InclusionFilter}, + ChainSpec, SnapshotSegment, TransactionSignedNoHash, +}; +use reth_provider::{ + DatabaseProviderRO, ProviderError, ProviderFactory, TransactionsProvider, + TransactionsProviderExt, +}; +use reth_snapshot::{ + segments, + segments::{get_snapshot_segment_file_name, Segment}, +}; +use std::{path::Path, sync::Arc}; + +impl Command { + pub(crate) fn generate_transactions_snapshot( + &self, + provider: &DatabaseProviderRO<'_, DB>, + compression: Compression, + inclusion_filter: InclusionFilter, + phf: PerfectHashingFunction, + ) -> eyre::Result<()> { + let segment = segments::Transactions::new( + compression, + if self.with_filters { + Filters::WithFilters(inclusion_filter, phf) + } else { + Filters::WithoutFilters + }, + ); + segment.snapshot::(provider, self.from..=(self.from + self.block_interval - 1))?; + + Ok(()) + } + + pub(crate) fn bench_transactions_snapshot( + &self, + db_path: &Path, + log_level: Option, + chain: Arc, + compression: Compression, + inclusion_filter: InclusionFilter, + phf: PerfectHashingFunction, + ) -> eyre::Result<()> { + let filters = if self.with_filters { + Filters::WithFilters(inclusion_filter, phf) + } else { + Filters::WithoutFilters + }; + + let block_range = self.from..=(self.from + self.block_interval - 1); + + let mut rng = rand::thread_rng(); + let mut dictionaries = None; + let mut jar = NippyJar::load(&get_snapshot_segment_file_name( + SnapshotSegment::Transactions, + filters, + compression, + &block_range, + ))?; + + let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone()) + .provider()? + .transaction_range_by_block_range(block_range)?; + + let mut row_indexes = tx_range.clone().collect::>(); + + let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?; + let mut cursor = if !decompressors.is_empty() { + provider.cursor_with_decompressors(decompressors) + } else { + provider.cursor() + }; + + for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] { + bench( + bench_kind, + (open_db_read_only(db_path, log_level)?, chain.clone()), + SnapshotSegment::Transactions, + filters, + compression, + || { + for num in row_indexes.iter() { + TransactionSignedNoHash::decompress( + cursor + .row_by_number_with_cols::<0b1, 1>( + (num - tx_range.start()) as usize, + )? + .ok_or(ProviderError::TransactionNotFound((*num).into()))?[0], + )? + .with_hash(); + // TODO: replace with below when eventually SnapshotProvider re-uses cursor + // provider.transaction_by_id(num as + // u64)?.ok_or(ProviderError::TransactionNotFound((*num).into()))?; + } + Ok(()) + }, + |provider| { + for num in row_indexes.iter() { + provider + .transaction_by_id(*num)? + .ok_or(ProviderError::TransactionNotFound((*num).into()))?; + } + Ok(()) + }, + )?; + + // For random walk + row_indexes.shuffle(&mut rng); + } + + // BENCHMARK QUERYING A RANDOM TRANSACTION BY NUMBER + { + let num = row_indexes[rng.gen_range(0..row_indexes.len())]; + bench( + BenchKind::RandomOne, + (open_db_read_only(db_path, log_level)?, chain.clone()), + SnapshotSegment::Transactions, + filters, + compression, + || { + Ok(TransactionSignedNoHash::decompress( + cursor + .row_by_number_with_cols::<0b1, 1>((num - tx_range.start()) as usize)? + .ok_or(ProviderError::TransactionNotFound((num as u64).into()))?[0], + )? + .with_hash()) + }, + |provider| { + Ok(provider + .transaction_by_id(num as u64)? + .ok_or(ProviderError::TransactionNotFound((num as u64).into()))?) + }, + )?; + } + + // BENCHMARK QUERYING A RANDOM TRANSACTION BY HASH + { + let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64; + let transaction_hash = + ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone()) + .transaction_by_id(num)? + .ok_or(ProviderError::TransactionNotFound(num.into()))? + .hash(); + + bench( + BenchKind::RandomHash, + (open_db_read_only(db_path, log_level)?, chain.clone()), + SnapshotSegment::Transactions, + filters, + compression, + || { + let transaction = TransactionSignedNoHash::decompress( + cursor + .row_by_key_with_cols::<0b1, 1>(transaction_hash.as_slice())? + .ok_or(ProviderError::TransactionNotFound(transaction_hash.into()))?[0], + )?; + + // Might be a false positive, so in the real world we have to validate it + Ok(transaction.with_hash()) + }, + |provider| { + Ok(provider + .transaction_by_hash(transaction_hash)? + .ok_or(ProviderError::TransactionNotFound(transaction_hash.into()))?) + }, + )?; + } + Ok(()) + } +} diff --git a/crates/interfaces/src/provider.rs b/crates/interfaces/src/provider.rs index ec86cf15e..71b1371f3 100644 --- a/crates/interfaces/src/provider.rs +++ b/crates/interfaces/src/provider.rs @@ -1,4 +1,6 @@ -use reth_primitives::{Address, BlockHash, BlockHashOrNumber, BlockNumber, TxNumber, B256}; +use reth_primitives::{ + Address, BlockHash, BlockHashOrNumber, BlockNumber, TxHashOrNumber, TxNumber, B256, +}; /// Bundled errors variants thrown by various providers. #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] @@ -40,6 +42,12 @@ pub enum ProviderError { /// when required header related data was not found but was required. #[error("no header found for {0:?}")] HeaderNotFound(BlockHashOrNumber), + /// The specific transaction is missing. + #[error("no transaction found for {0:?}")] + TransactionNotFound(TxHashOrNumber), + /// The specific receipt is missing + #[error("no receipt found for {0:?}")] + ReceiptNotFound(TxHashOrNumber), /// Unable to find a specific block. #[error("block does not exist {0:?}")] BlockNotFound(BlockHashOrNumber), diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 2ac3b2131..b82286c1c 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -91,7 +91,7 @@ pub use transaction::{ IntoRecoveredTransaction, InvalidTransactionError, PooledTransactionsElement, PooledTransactionsElementEcRecovered, Signature, Transaction, TransactionKind, TransactionMeta, TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash, TxEip1559, TxEip2930, - TxEip4844, TxLegacy, TxType, TxValue, EIP1559_TX_TYPE_ID, EIP2930_TX_TYPE_ID, + TxEip4844, TxHashOrNumber, TxLegacy, TxType, TxValue, EIP1559_TX_TYPE_ID, EIP2930_TX_TYPE_ID, EIP4844_TX_TYPE_ID, LEGACY_TX_TYPE_ID, }; pub use withdrawal::Withdrawal; diff --git a/crates/primitives/src/snapshot/mod.rs b/crates/primitives/src/snapshot/mod.rs index 6355ff0ef..a61dedfb3 100644 --- a/crates/primitives/src/snapshot/mod.rs +++ b/crates/primitives/src/snapshot/mod.rs @@ -6,4 +6,4 @@ mod segment; pub use compression::Compression; pub use filters::{Filters, InclusionFilter, PerfectHashingFunction}; -pub use segment::SnapshotSegment; +pub use segment::{SegmentHeader, SnapshotSegment}; diff --git a/crates/primitives/src/snapshot/segment.rs b/crates/primitives/src/snapshot/segment.rs index 8902e5005..cbd9ad432 100644 --- a/crates/primitives/src/snapshot/segment.rs +++ b/crates/primitives/src/snapshot/segment.rs @@ -1,4 +1,6 @@ +use crate::{BlockNumber, TxNumber}; use serde::{Deserialize, Serialize}; +use std::ops::RangeInclusive; #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, Serialize)] #[cfg_attr(feature = "clap", derive(clap::ValueEnum))] @@ -11,3 +13,30 @@ pub enum SnapshotSegment { /// Snapshot segment responsible for the `Receipts` table. Receipts, } + +/// A segment header that contains information common to all segments. Used for storage. +#[derive(Debug, Serialize, Deserialize)] +pub struct SegmentHeader { + block_range: RangeInclusive, + tx_range: RangeInclusive, +} + +impl SegmentHeader { + /// Returns [`SegmentHeader`]. + pub fn new( + block_range: RangeInclusive, + tx_range: RangeInclusive, + ) -> Self { + Self { block_range, tx_range } + } + + /// Returns the first block number of the segment. + pub fn block_start(&self) -> BlockNumber { + *self.block_range.start() + } + + /// Returns the first transaction number of the segment. + pub fn tx_start(&self) -> TxNumber { + *self.tx_range.start() + } +} diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index 664fd3137..1d1dc1228 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -1,6 +1,6 @@ use crate::{ compression::{TRANSACTION_COMPRESSOR, TRANSACTION_DECOMPRESSOR}, - keccak256, Address, Bytes, TxHash, B256, + keccak256, Address, BlockHashOrNumber, Bytes, TxHash, B256, }; use alloy_rlp::{ Decodable, Encodable, Error as RlpError, Header, EMPTY_LIST_CODE, EMPTY_STRING_CODE, @@ -1307,6 +1307,9 @@ impl IntoRecoveredTransaction for TransactionSignedEcRecovered { } } +/// Either a transaction hash or number. +pub type TxHashOrNumber = BlockHashOrNumber; + #[cfg(test)] mod tests { use crate::{ diff --git a/crates/snapshot/src/segments/headers.rs b/crates/snapshot/src/segments/headers.rs index 2de938b11..4cc3ced20 100644 --- a/crates/snapshot/src/segments/headers.rs +++ b/crates/snapshot/src/segments/headers.rs @@ -1,6 +1,6 @@ -use crate::segments::{prepare_jar, Segment}; +use crate::segments::{prepare_jar, Segment, SegmentHeader}; use reth_db::{ - cursor::DbCursorRO, snapshot::create_snapshot_T1_T2_T3, table::Table, tables, + cursor::DbCursorRO, database::Database, snapshot::create_snapshot_T1_T2_T3, tables, transaction::DbTx, RawKey, RawTable, }; use reth_interfaces::RethResult; @@ -8,6 +8,7 @@ use reth_primitives::{ snapshot::{Compression, Filters}, BlockNumber, SnapshotSegment, }; +use reth_provider::DatabaseProviderRO; use std::ops::RangeInclusive; /// Snapshot segment responsible for [SnapshotSegment::Headers] part of data. @@ -22,28 +23,17 @@ impl Headers { pub fn new(compression: Compression, filters: Filters) -> Self { Self { compression, filters } } - - // Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000). - fn dataset_for_compression>( - &self, - tx: &impl DbTx, - range: &RangeInclusive, - range_len: usize, - ) -> RethResult>> { - let mut cursor = tx.cursor_read::>()?; - Ok(cursor - .walk_back(Some(RawKey::from(*range.end())))? - .take(range_len.min(1000)) - .map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist")) - .collect::>()) - } } impl Segment for Headers { - fn snapshot(&self, tx: &impl DbTx, range: RangeInclusive) -> RethResult<()> { + fn snapshot( + &self, + provider: &DatabaseProviderRO<'_, DB>, + range: RangeInclusive, + ) -> RethResult<()> { let range_len = range.clone().count(); - let mut jar = prepare_jar::<3, tables::Headers>( - tx, + let mut jar = prepare_jar::( + provider, SnapshotSegment::Headers, self.filters, self.compression, @@ -51,17 +41,21 @@ impl Segment for Headers { range_len, || { Ok([ - self.dataset_for_compression::(tx, &range, range_len)?, - self.dataset_for_compression::(tx, &range, range_len)?, - self.dataset_for_compression::( - tx, &range, range_len, + self.dataset_for_compression::( + provider, &range, range_len, + )?, + self.dataset_for_compression::( + provider, &range, range_len, + )?, + self.dataset_for_compression::( + provider, &range, range_len, )?, ]) }, )?; // Generate list of hashes for filters & PHF - let mut cursor = tx.cursor_read::>()?; + let mut cursor = provider.tx_ref().cursor_read::>()?; let mut hashes = None; if self.filters.has_filters() { hashes = Some( @@ -77,8 +71,9 @@ impl Segment for Headers { tables::HeaderTD, tables::CanonicalHeaders, BlockNumber, + SegmentHeader, >( - tx, + provider.tx_ref(), range, None, // We already prepared the dictionary beforehand diff --git a/crates/snapshot/src/segments/mod.rs b/crates/snapshot/src/segments/mod.rs index 1d9ee6a3a..8d649115d 100644 --- a/crates/snapshot/src/segments/mod.rs +++ b/crates/snapshot/src/segments/mod.rs @@ -1,16 +1,24 @@ //! Snapshot segment implementations and utilities. -mod headers; +mod transactions; +pub use transactions::Transactions; +mod headers; pub use headers::Headers; -use reth_db::{table::Table, transaction::DbTx}; +mod receipts; +pub use receipts::Receipts; + +use reth_db::{ + cursor::DbCursorRO, database::Database, table::Table, transaction::DbTx, RawKey, RawTable, +}; use reth_interfaces::RethResult; use reth_nippy_jar::NippyJar; use reth_primitives::{ - snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction}, + snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentHeader}, BlockNumber, SnapshotSegment, }; +use reth_provider::{DatabaseProviderRO, TransactionsProviderExt}; use std::{ops::RangeInclusive, path::PathBuf}; pub(crate) type Rows = [Vec>; COLUMNS]; @@ -18,22 +26,43 @@ pub(crate) type Rows = [Vec>; COLUMNS]; /// A segment represents a snapshotting of some portion of the data. pub trait Segment { /// Snapshot data using the provided range. - fn snapshot(&self, tx: &impl DbTx, range: RangeInclusive) -> RethResult<()>; + fn snapshot( + &self, + provider: &DatabaseProviderRO<'_, DB>, + range: RangeInclusive, + ) -> RethResult<()>; + + /// Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000). + fn dataset_for_compression>( + &self, + provider: &DatabaseProviderRO<'_, DB>, + range: &RangeInclusive, + range_len: usize, + ) -> RethResult>> { + let mut cursor = provider.tx_ref().cursor_read::>()?; + Ok(cursor + .walk_back(Some(RawKey::from(*range.end())))? + .take(range_len.min(1000)) + .map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist")) + .collect::>()) + } } /// Returns a [`NippyJar`] according to the desired configuration. -pub(crate) fn prepare_jar( - tx: &impl DbTx, +pub(crate) fn prepare_jar( + provider: &DatabaseProviderRO<'_, DB>, segment: SnapshotSegment, filters: Filters, compression: Compression, - range: RangeInclusive, - range_len: usize, + block_range: RangeInclusive, + total_rows: usize, prepare_compression: impl Fn() -> RethResult>, -) -> RethResult { - let mut nippy_jar = NippyJar::new_without_header( +) -> RethResult> { + let tx_range = provider.transaction_range_by_block_range(block_range.clone())?; + let mut nippy_jar = NippyJar::new( COLUMNS, - &get_snapshot_segment_file_name(segment, filters, compression, &range), + &get_snapshot_segment_file_name(segment, filters, compression, &block_range), + SegmentHeader::new(block_range, tx_range), ); nippy_jar = match compression { @@ -50,7 +79,6 @@ pub(crate) fn prepare_jar( }; if let Filters::WithFilters(inclusion_filter, phf) = filters { - let total_rows = (tx.entries::()? - *range.start() as usize).min(range_len); nippy_jar = match inclusion_filter { InclusionFilter::Cuckoo => nippy_jar.with_cuckoo_filter(total_rows), }; diff --git a/crates/snapshot/src/segments/receipts.rs b/crates/snapshot/src/segments/receipts.rs new file mode 100644 index 000000000..4fb2e399d --- /dev/null +++ b/crates/snapshot/src/segments/receipts.rs @@ -0,0 +1,74 @@ +use crate::segments::{prepare_jar, Segment}; +use reth_db::{database::Database, snapshot::create_snapshot_T1, tables}; +use reth_interfaces::RethResult; +use reth_primitives::{ + snapshot::{Compression, Filters, SegmentHeader}, + BlockNumber, SnapshotSegment, TxNumber, +}; +use reth_provider::{DatabaseProviderRO, TransactionsProviderExt}; +use std::ops::RangeInclusive; + +/// Snapshot segment responsible for [SnapshotSegment::Receipts] part of data. +#[derive(Debug)] +pub struct Receipts { + compression: Compression, + filters: Filters, +} + +impl Receipts { + /// Creates new instance of [Receipts] snapshot segment. + pub fn new(compression: Compression, filters: Filters) -> Self { + Self { compression, filters } + } +} + +impl Segment for Receipts { + fn snapshot( + &self, + provider: &DatabaseProviderRO<'_, DB>, + block_range: RangeInclusive, + ) -> RethResult<()> { + let tx_range = provider.transaction_range_by_block_range(block_range.clone())?; + let tx_range_len = tx_range.clone().count(); + + let mut jar = prepare_jar::( + provider, + SnapshotSegment::Receipts, + self.filters, + self.compression, + block_range, + tx_range_len, + || { + Ok([self.dataset_for_compression::( + provider, + &tx_range, + tx_range_len, + )?]) + }, + )?; + + // Generate list of hashes for filters & PHF + let mut hashes = None; + if self.filters.has_filters() { + hashes = Some( + provider + .transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))? + .into_iter() + .map(|(tx, _)| Ok(tx)), + ); + } + + create_snapshot_T1::( + provider.tx_ref(), + tx_range, + None, + // We already prepared the dictionary beforehand + None::>>>, + hashes, + tx_range_len, + &mut jar, + )?; + + Ok(()) + } +} diff --git a/crates/snapshot/src/segments/transactions.rs b/crates/snapshot/src/segments/transactions.rs new file mode 100644 index 000000000..09d120c09 --- /dev/null +++ b/crates/snapshot/src/segments/transactions.rs @@ -0,0 +1,74 @@ +use crate::segments::{prepare_jar, Segment}; +use reth_db::{database::Database, snapshot::create_snapshot_T1, tables}; +use reth_interfaces::RethResult; +use reth_primitives::{ + snapshot::{Compression, Filters, SegmentHeader}, + BlockNumber, SnapshotSegment, TxNumber, +}; +use reth_provider::{DatabaseProviderRO, TransactionsProviderExt}; +use std::ops::RangeInclusive; + +/// Snapshot segment responsible for [SnapshotSegment::Transactions] part of data. +#[derive(Debug)] +pub struct Transactions { + compression: Compression, + filters: Filters, +} + +impl Transactions { + /// Creates new instance of [Transactions] snapshot segment. + pub fn new(compression: Compression, filters: Filters) -> Self { + Self { compression, filters } + } +} + +impl Segment for Transactions { + fn snapshot( + &self, + provider: &DatabaseProviderRO<'_, DB>, + block_range: RangeInclusive, + ) -> RethResult<()> { + let tx_range = provider.transaction_range_by_block_range(block_range.clone())?; + let tx_range_len = tx_range.clone().count(); + + let mut jar = prepare_jar::( + provider, + SnapshotSegment::Transactions, + self.filters, + self.compression, + block_range, + tx_range_len, + || { + Ok([self.dataset_for_compression::( + provider, + &tx_range, + tx_range_len, + )?]) + }, + )?; + + // Generate list of hashes for filters & PHF + let mut hashes = None; + if self.filters.has_filters() { + hashes = Some( + provider + .transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))? + .into_iter() + .map(|(tx, _)| Ok(tx)), + ); + } + + create_snapshot_T1::( + provider.tx_ref(), + tx_range, + None, + // We already prepared the dictionary beforehand + None::>>>, + hashes, + tx_range_len, + &mut jar, + )?; + + Ok(()) + } +} diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 697c18707..758fa4033 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -1,23 +1,20 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; -use itertools::Itertools; use rayon::prelude::*; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, tables, transaction::{DbTx, DbTxMut}, - DatabaseError, }; use reth_interfaces::provider::ProviderError; use reth_primitives::{ - keccak256, stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, - PruneCheckpoint, PruneMode, PruneSegment, TransactionSignedNoHash, TxNumber, B256, + PruneCheckpoint, PruneMode, PruneSegment, }; use reth_provider::{ BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, + TransactionsProviderExt, }; -use tokio::sync::mpsc; use tracing::*; /// The transaction lookup stage. @@ -93,49 +90,15 @@ impl Stage for TransactionLookupStage { let (tx_range, block_range, is_final_range) = input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?; let end_block = *block_range.end(); - let tx_range_size = tx_range.clone().count(); debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Updating transaction lookup"); - let tx = provider.tx_ref(); - let mut tx_cursor = tx.cursor_read::()?; - let tx_walker = tx_cursor.walk_range(tx_range)?; - - let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1); - let mut channels = Vec::with_capacity(chunk_size); - let mut transaction_count = 0; - - for chunk in &tx_walker.chunks(chunk_size) { - let (tx, rx) = mpsc::unbounded_channel(); - channels.push(rx); - - // Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send) - let chunk: Vec<_> = chunk.collect(); - transaction_count += chunk.len(); - - // Spawn the task onto the global rayon pool - // This task will send the results through the channel after it has calculated the hash. - rayon::spawn(move || { - let mut rlp_buf = Vec::with_capacity(128); - for entry in chunk { - rlp_buf.clear(); - let _ = tx.send(calculate_hash(entry, &mut rlp_buf)); - } - }); - } - let mut tx_list = Vec::with_capacity(transaction_count); - - // Iterate over channels and append the tx hashes to be sorted out later - for mut channel in channels { - while let Some(tx) = channel.recv().await { - let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?; - tx_list.push((tx_hash, tx_id)); - } - } + let mut tx_list = provider.transaction_hashes_by_range(tx_range)?; // Sort before inserting the reverse lookup for hash -> tx_id. tx_list.par_sort_unstable_by(|txa, txb| txa.0.cmp(&txb.0)); + let tx = provider.tx_ref(); let mut txhash_cursor = tx.cursor_write::()?; // If the last inserted element in the database is equal or bigger than the first @@ -201,17 +164,6 @@ impl Stage for TransactionLookupStage { } } -/// Calculates the hash of the given transaction -#[inline] -fn calculate_hash( - entry: Result<(TxNumber, TransactionSignedNoHash), DatabaseError>, - rlp_buf: &mut Vec, -) -> Result<(B256, TxNumber), Box> { - let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?; - tx.transaction.encode_with_signature(&tx.signature, rlp_buf, false); - Ok((keccak256(rlp_buf), tx_id)) -} - fn stage_checkpoint( provider: &DatabaseProviderRW<'_, &DB>, ) -> Result { diff --git a/crates/storage/db/Cargo.toml b/crates/storage/db/Cargo.toml index 937bb232b..2920ffd68 100644 --- a/crates/storage/db/Cargo.toml +++ b/crates/storage/db/Cargo.toml @@ -45,6 +45,8 @@ parking_lot.workspace = true derive_more = "0.99" eyre.workspace = true paste = "1.0" +rayon.workspace = true +itertools.workspace = true # arbitrary utils arbitrary = { workspace = true, features = ["derive"], optional = true } diff --git a/crates/storage/db/src/snapshot.rs b/crates/storage/db/src/snapshot.rs index 7f6237987..7cb27847f 100644 --- a/crates/storage/db/src/snapshot.rs +++ b/crates/storage/db/src/snapshot.rs @@ -9,6 +9,7 @@ use crate::{ use reth_interfaces::RethResult; use reth_nippy_jar::{ColumnResult, NippyJar, PHFKey}; use reth_tracing::tracing::*; +use serde::{Deserialize, Serialize}; use std::{error::Error as StdError, ops::RangeInclusive}; /// Macro that generates snapshot creation functions that take an arbitratry number of [`Table`] and @@ -34,7 +35,8 @@ macro_rules! generate_snapshot_func { #[allow(non_snake_case)] pub fn []< $($tbl: Table,)+ - K + K, + H: for<'a> Deserialize<'a> + Send + Serialize + Sync + std::fmt::Debug > ( tx: &impl DbTx, @@ -43,7 +45,7 @@ macro_rules! generate_snapshot_func { dict_compression_set: Option>>>, keys: Option>>, row_count: usize, - nippy_jar: &mut NippyJar + nippy_jar: &mut NippyJar ) -> RethResult<()> where K: Key + Copy { diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index c100d5a1e..87118a635 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -25,7 +25,7 @@ pub use traits::{ PruneCheckpointWriter, ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StageCheckpointWriter, StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, StorageReader, TransactionVariant, TransactionsProvider, - WithdrawalsProvider, + TransactionsProviderExt, WithdrawalsProvider, }; /// Provider trait implementations. diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 913503f09..f92a37c4b 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -6,7 +6,8 @@ use crate::{ AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter, Chain, EvmEnvProvider, HashingWriter, HeaderProvider, HistoryWriter, OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, StageCheckpointReader, - StorageReader, TransactionVariant, TransactionsProvider, WithdrawalsProvider, + StorageReader, TransactionVariant, TransactionsProvider, TransactionsProviderExt, + WithdrawalsProvider, }; use itertools::{izip, Itertools}; use reth_db::{ @@ -24,7 +25,7 @@ use reth_db::{ }; use reth_interfaces::{ executor::{BlockExecutionError, BlockValidationError}, - RethResult, + RethError, RethResult, }; use reth_primitives::{ keccak256, @@ -46,7 +47,7 @@ use std::{ collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet}, fmt::Debug, ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive}, - sync::Arc, + sync::{mpsc, Arc}, }; /// A [`DatabaseProvider`] that holds a read-only database transaction. @@ -1140,6 +1141,65 @@ impl BlockReader for DatabaseProvider { } } +impl TransactionsProviderExt for DatabaseProvider { + /// Recovers transaction hashes by walking through `Transactions` table and + /// calculating them in a parallel manner. Returned unsorted. + fn transaction_hashes_by_range( + &self, + tx_range: Range, + ) -> RethResult> { + let mut tx_cursor = self.tx.cursor_read::()?; + let tx_range_size = tx_range.clone().count(); + let tx_walker = tx_cursor.walk_range(tx_range)?; + + let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1); + let mut channels = Vec::with_capacity(chunk_size); + let mut transaction_count = 0; + + #[inline] + fn calculate_hash( + entry: Result<(TxNumber, TransactionSignedNoHash), DatabaseError>, + rlp_buf: &mut Vec, + ) -> Result<(B256, TxNumber), Box> { + let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?; + tx.transaction.encode_with_signature(&tx.signature, rlp_buf, false); + Ok((keccak256(rlp_buf), tx_id)) + } + + for chunk in &tx_walker.chunks(chunk_size) { + let (tx, rx) = mpsc::channel(); + channels.push(rx); + + // Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send) + let chunk: Vec<_> = chunk.collect(); + transaction_count += chunk.len(); + + // Spawn the task onto the global rayon pool + // This task will send the results through the channel after it has calculated the hash. + rayon::spawn(move || { + let mut rlp_buf = Vec::with_capacity(128); + for entry in chunk { + rlp_buf.clear(); + let _ = tx.send(calculate_hash(entry, &mut rlp_buf)); + } + }); + } + let mut tx_list = Vec::with_capacity(transaction_count); + + // Iterate over channels and append the tx hashes unsorted + for channel in channels { + while let Ok(tx) = channel.recv() { + let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?; + tx_list.push((tx_hash, tx_id)); + } + } + + Ok(tx_list) + } +} + +/// Calculates the hash of the given transaction + impl TransactionsProvider for DatabaseProvider { fn transaction_id(&self, tx_hash: TxHash) -> RethResult> { Ok(self.tx.get::(tx_hash)?) diff --git a/crates/storage/provider/src/providers/snapshot.rs b/crates/storage/provider/src/providers/snapshot.rs index ec2c36e6b..6c81965d7 100644 --- a/crates/storage/provider/src/providers/snapshot.rs +++ b/crates/storage/provider/src/providers/snapshot.rs @@ -1,11 +1,15 @@ -use crate::HeaderProvider; +use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider}; use reth_db::{ table::{Decompress, Table}, HeaderTD, }; use reth_interfaces::{provider::ProviderError, RethResult}; use reth_nippy_jar::{compression::Decompressor, NippyJar, NippyJarCursor}; -use reth_primitives::{BlockHash, BlockNumber, Header, SealedHeader, U256}; +use reth_primitives::{ + snapshot::SegmentHeader, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, + SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, + B256, U256, +}; use std::ops::RangeBounds; /// SnapshotProvider @@ -16,14 +20,12 @@ use std::ops::RangeBounds; #[derive(Debug)] pub struct SnapshotProvider<'a> { /// NippyJar - pub jar: &'a NippyJar, - /// Starting snapshot block - pub jar_start_block: u64, + pub jar: &'a NippyJar, } impl<'a> SnapshotProvider<'a> { /// Creates cursor - pub fn cursor(&self) -> NippyJarCursor<'a> { + pub fn cursor(&self) -> NippyJarCursor<'a, SegmentHeader> { NippyJarCursor::new(self.jar, None).unwrap() } @@ -31,7 +33,7 @@ impl<'a> SnapshotProvider<'a> { pub fn cursor_with_decompressors( &self, decompressors: Vec>, - ) -> NippyJarCursor<'a> { + ) -> NippyJarCursor<'a, SegmentHeader> { NippyJarCursor::new(self.jar, Some(decompressors)).unwrap() } } @@ -57,7 +59,9 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> { fn header_by_number(&self, num: BlockNumber) -> RethResult> { Header::decompress( self.cursor() - .row_by_number_with_cols::<0b01, 2>((num - self.jar_start_block) as usize)? + .row_by_number_with_cols::<0b01, 2>( + (num - self.jar.user_header().block_start()) as usize, + )? .ok_or(ProviderError::HeaderNotFound(num.into()))?[0], ) .map(Some) @@ -101,6 +105,122 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> { } } +impl<'a> BlockHashReader for SnapshotProvider<'a> { + fn block_hash(&self, _number: u64) -> RethResult> { + todo!() + } + + fn canonical_hashes_range( + &self, + _start: BlockNumber, + _end: BlockNumber, + ) -> RethResult> { + todo!() + } +} + +impl<'a> BlockNumReader for SnapshotProvider<'a> { + fn chain_info(&self) -> RethResult { + todo!() + } + + fn best_block_number(&self) -> RethResult { + todo!() + } + + fn last_block_number(&self) -> RethResult { + todo!() + } + + fn block_number(&self, _hash: B256) -> RethResult> { + todo!() + } +} + +impl<'a> TransactionsProvider for SnapshotProvider<'a> { + fn transaction_id(&self, _tx_hash: TxHash) -> RethResult> { + todo!() + } + + fn transaction_by_id(&self, num: TxNumber) -> RethResult> { + TransactionSignedNoHash::decompress( + self.cursor() + .row_by_number_with_cols::<0b1, 1>( + (num - self.jar.user_header().tx_start()) as usize, + )? + .ok_or(ProviderError::TransactionNotFound(num.into()))?[0], + ) + .map(Into::into) + .map(Some) + .map_err(Into::into) + } + + fn transaction_by_id_no_hash( + &self, + _id: TxNumber, + ) -> RethResult> { + todo!() + } + + fn transaction_by_hash(&self, hash: TxHash) -> RethResult> { + // WIP + let mut cursor = self.cursor(); + + let tx = TransactionSignedNoHash::decompress( + cursor.row_by_key_with_cols::<0b1, 1>(&hash.0).unwrap().unwrap()[0], + ) + .unwrap() + .with_hash(); + + if tx.hash() == hash { + return Ok(Some(tx)) + } else { + // check next snapshot + } + Ok(None) + } + + fn transaction_by_hash_with_meta( + &self, + _hash: TxHash, + ) -> RethResult> { + todo!() + } + + fn transaction_block(&self, _id: TxNumber) -> RethResult> { + todo!() + } + + fn transactions_by_block( + &self, + _block_id: BlockHashOrNumber, + ) -> RethResult>> { + todo!() + } + + fn transactions_by_block_range( + &self, + _range: impl RangeBounds, + ) -> RethResult>> { + todo!() + } + + fn senders_by_tx_range(&self, _range: impl RangeBounds) -> RethResult> { + todo!() + } + + fn transactions_by_tx_range( + &self, + _range: impl RangeBounds, + ) -> RethResult> { + todo!() + } + + fn transaction_sender(&self, _id: TxNumber) -> RethResult> { + todo!() + } +} + #[cfg(test)] mod test { use super::*; @@ -123,6 +243,7 @@ mod test { // Ranges let row_count = 100u64; let range = 0..=(row_count - 1); + let segment_header = SegmentHeader::new(range.clone(), range.clone()); // Data sources let db = create_test_rw_db(); @@ -157,7 +278,7 @@ mod test { let with_compression = true; let with_filter = true; - let mut nippy_jar = NippyJar::new_without_header(2, snap_file.path()); + let mut nippy_jar = NippyJar::new(2, snap_file.path(), segment_header); if with_compression { nippy_jar = nippy_jar.with_zstd(false, 0); @@ -180,7 +301,7 @@ mod test { .unwrap() .map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())); - create_snapshot_T1_T2::( + create_snapshot_T1_T2::( &tx, range, None, @@ -194,10 +315,10 @@ mod test { // Use providers to query Header data and compare if it matches { - let jar = NippyJar::load_without_header(snap_file.path()).unwrap(); + let jar = NippyJar::load(snap_file.path()).unwrap(); let db_provider = factory.provider().unwrap(); - let snap_provider = SnapshotProvider { jar: &jar, jar_start_block: 0 }; + let snap_provider = SnapshotProvider { jar: &jar }; assert!(!headers.is_empty()); diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index ae6d36387..8134a1961 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -37,7 +37,7 @@ pub use state::{ }; mod transactions; -pub use transactions::TransactionsProvider; +pub use transactions::{TransactionsProvider, TransactionsProviderExt}; mod withdrawals; pub use withdrawals::WithdrawalsProvider; diff --git a/crates/storage/provider/src/traits/transactions.rs b/crates/storage/provider/src/traits/transactions.rs index 711b46c4f..2f9c72ed1 100644 --- a/crates/storage/provider/src/traits/transactions.rs +++ b/crates/storage/provider/src/traits/transactions.rs @@ -1,10 +1,10 @@ -use crate::BlockNumReader; -use reth_interfaces::RethResult; +use crate::{BlockNumReader, BlockReader}; +use reth_interfaces::{provider::ProviderError, RethResult}; use reth_primitives::{ Address, BlockHashOrNumber, BlockNumber, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, }; -use std::ops::RangeBounds; +use std::ops::{Range, RangeBounds, RangeInclusive}; /// Client trait for fetching [TransactionSigned] related data. #[auto_impl::auto_impl(&, Arc)] @@ -63,3 +63,31 @@ pub trait TransactionsProvider: BlockNumReader + Send + Sync { /// Returns None if the transaction is not found. fn transaction_sender(&self, id: TxNumber) -> RethResult>; } + +/// Client trait for fetching additional [TransactionSigned] related data. +#[auto_impl::auto_impl(&, Arc)] +pub trait TransactionsProviderExt: BlockReader + Send + Sync { + /// Get transactions range by block range. + fn transaction_range_by_block_range( + &self, + block_range: RangeInclusive, + ) -> RethResult> { + let from = self + .block_body_indices(*block_range.start())? + .ok_or(ProviderError::BlockBodyIndicesNotFound(*block_range.start()))? + .first_tx_num(); + + let to = self + .block_body_indices(*block_range.end())? + .ok_or(ProviderError::BlockBodyIndicesNotFound(*block_range.end()))? + .last_tx_num(); + + Ok(from..=to) + } + + /// Get transaction hashes from a transaction range. + fn transaction_hashes_by_range( + &self, + tx_range: Range, + ) -> RethResult>; +}