feat: add reth db snapshot transactions | receipts commands (#5007)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
joshieDo
2023-10-26 13:01:29 +01:00
committed by GitHub
parent c1a6e42d19
commit 0116b80414
23 changed files with 924 additions and 179 deletions

2
Cargo.lock generated
View File

@ -5833,6 +5833,7 @@ dependencies = [
"futures",
"heapless",
"iai",
"itertools 0.11.0",
"metrics",
"modular-bitfield",
"page_size",
@ -5844,6 +5845,7 @@ dependencies = [
"proptest",
"proptest-derive",
"rand 0.8.5",
"rayon",
"reth-codecs",
"reth-interfaces",
"reth-libmdbx",

View File

@ -4,7 +4,7 @@ use reth_primitives::{
ChainSpec, SnapshotSegment,
};
use reth_provider::{DatabaseProviderRO, ProviderFactory};
use std::{sync::Arc, time::Instant};
use std::{fmt::Debug, sync::Arc, time::Instant};
#[derive(Debug)]
pub(crate) enum BenchKind {
@ -14,7 +14,7 @@ pub(crate) enum BenchKind {
RandomHash,
}
pub(crate) fn bench<F1, F2>(
pub(crate) fn bench<F1, F2, R>(
bench_kind: BenchKind,
db: (DatabaseEnvRO, Arc<ChainSpec>),
segment: SnapshotSegment,
@ -24,28 +24,34 @@ pub(crate) fn bench<F1, F2>(
database_method: F2,
) -> eyre::Result<()>
where
F1: FnMut() -> eyre::Result<()>,
F2: Fn(DatabaseProviderRO<'_, DatabaseEnvRO>) -> eyre::Result<()>,
F1: FnMut() -> eyre::Result<R>,
F2: Fn(DatabaseProviderRO<'_, DatabaseEnvRO>) -> eyre::Result<R>,
R: Debug + PartialEq,
{
let (db, chain) = db;
println!();
println!("############");
println!("## [{segment:?}] [{compression:?}] [{filters:?}] [{bench_kind:?}]");
{
let snap_result = {
let start = Instant::now();
snapshot_method()?;
let result = snapshot_method()?;
let end = start.elapsed().as_micros();
println!("# snapshot {bench_kind:?} | {end} μs");
}
{
result
};
let db_result = {
let factory = ProviderFactory::new(db, chain);
let provider = factory.provider()?;
let start = Instant::now();
database_method(provider)?;
let result = database_method(provider)?;
let end = start.elapsed().as_micros();
println!("# database {bench_kind:?} | {end} μs");
}
result
};
assert_eq!(snap_result, db_result);
Ok(())
}

View File

@ -2,23 +2,22 @@ use super::{
bench::{bench, BenchKind},
Command,
};
use crate::utils::DbTool;
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, table::Decompress, DatabaseEnvRO};
use reth_db::{database::Database, open_db_read_only, table::Decompress};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
ChainSpec, Header, SnapshotSegment,
};
use reth_provider::{HeaderProvider, ProviderError, ProviderFactory};
use reth_provider::{DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory};
use reth_snapshot::segments::{get_snapshot_segment_file_name, Headers, Segment};
use std::{path::Path, sync::Arc};
impl Command {
pub(crate) fn generate_headers_snapshot(
pub(crate) fn generate_headers_snapshot<DB: Database>(
&self,
tool: &DbTool<'_, DatabaseEnvRO>,
provider: &DatabaseProviderRO<'_, DB>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
@ -31,7 +30,7 @@ impl Command {
Filters::WithoutFilters
},
);
segment.snapshot(&tool.db.tx()?, self.from..=(self.from + self.block_interval - 1))?;
segment.snapshot::<DB>(provider, self.from..=(self.from + self.block_interval - 1))?;
Ok(())
}
@ -56,7 +55,7 @@ impl Command {
let mut row_indexes = range.clone().collect::<Vec<_>>();
let mut rng = rand::thread_rng();
let mut dictionaries = None;
let mut jar = NippyJar::load_without_header(&get_snapshot_segment_file_name(
let mut jar = NippyJar::load(&get_snapshot_segment_file_name(
SnapshotSegment::Headers,
filters,
compression,
@ -114,18 +113,16 @@ impl Command {
filters,
compression,
|| {
Header::decompress(
Ok(Header::decompress(
cursor
.row_by_number_with_cols::<0b01, 2>((num - self.from) as usize)?
.ok_or(ProviderError::HeaderNotFound((num as u64).into()))?[0],
)?;
Ok(())
)?)
},
|provider| {
provider
Ok(provider
.header_by_number(num as u64)?
.ok_or(ProviderError::HeaderNotFound((num as u64).into()))?;
Ok(())
.ok_or(ProviderError::HeaderNotFound((num as u64).into()))?)
},
)?;
}
@ -154,13 +151,12 @@ impl Command {
// Might be a false positive, so in the real world we have to validate it
assert_eq!(header.hash_slow(), header_hash);
Ok(())
Ok(header)
},
|provider| {
provider
Ok(provider
.header(&header_hash)?
.ok_or(ProviderError::HeaderNotFound(header_hash.into()))?;
Ok(())
.ok_or(ProviderError::HeaderNotFound(header_hash.into()))?)
},
)?;
}

View File

@ -1,44 +1,26 @@
use crate::{db::genesis_value_parser, utils::DbTool};
use clap::Parser;
use itertools::Itertools;
use reth_db::open_db_read_only;
use reth_db::{open_db_read_only, DatabaseEnvRO};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::{
compression::{DecoderDictionary, Decompressor},
NippyJar,
};
use reth_primitives::{
snapshot::{Compression, InclusionFilter, PerfectHashingFunction},
snapshot::{Compression, InclusionFilter, PerfectHashingFunction, SegmentHeader},
BlockNumber, ChainSpec, SnapshotSegment,
};
use reth_provider::providers::SnapshotProvider;
use reth_provider::{providers::SnapshotProvider, ProviderFactory};
use std::{path::Path, sync::Arc};
mod bench;
mod headers;
mod receipts;
mod transactions;
#[derive(Parser, Debug)]
/// Arguments for the `reth db snapshot` command.
pub struct Command {
/// The chain this node is running.
///
/// Possible values are either a built-in chain or the path to a chain specification file.
///
/// Built-in chains:
/// - mainnet
/// - goerli
/// - sepolia
/// - holesky
#[arg(
long,
value_name = "CHAIN_OR_PATH",
verbatim_doc_comment,
default_value = "mainnet",
value_parser = genesis_value_parser,
global = true,
)]
chain: Arc<ChainSpec>,
/// Snapshot segments to generate.
segments: Vec<SnapshotSegment>,
@ -87,19 +69,33 @@ impl Command {
{
let db = open_db_read_only(db_path, None)?;
let tool = DbTool::new(&db, chain.clone())?;
let factory = ProviderFactory::new(db, chain.clone());
let provider = factory.provider()?;
if !self.only_bench {
for ((mode, compression), phf) in all_combinations.clone() {
match mode {
SnapshotSegment::Headers => self.generate_headers_snapshot(
&tool,
*compression,
InclusionFilter::Cuckoo,
*phf,
)?,
SnapshotSegment::Transactions => todo!(),
SnapshotSegment::Receipts => todo!(),
SnapshotSegment::Headers => self
.generate_headers_snapshot::<DatabaseEnvRO>(
&provider,
*compression,
InclusionFilter::Cuckoo,
*phf,
)?,
SnapshotSegment::Transactions => self
.generate_transactions_snapshot::<DatabaseEnvRO>(
&provider,
*compression,
InclusionFilter::Cuckoo,
*phf,
)?,
SnapshotSegment::Receipts => self
.generate_receipts_snapshot::<DatabaseEnvRO>(
&provider,
*compression,
InclusionFilter::Cuckoo,
*phf,
)?,
}
}
}
@ -116,8 +112,22 @@ impl Command {
InclusionFilter::Cuckoo,
*phf,
)?,
SnapshotSegment::Transactions => todo!(),
SnapshotSegment::Receipts => todo!(),
SnapshotSegment::Transactions => self.bench_transactions_snapshot(
db_path,
log_level,
chain.clone(),
*compression,
InclusionFilter::Cuckoo,
*phf,
)?,
SnapshotSegment::Receipts => self.bench_receipts_snapshot(
db_path,
log_level,
chain.clone(),
*compression,
InclusionFilter::Cuckoo,
*phf,
)?,
}
}
}
@ -129,7 +139,7 @@ impl Command {
/// [`DecoderDictionary`] and [`Decompressor`] if necessary.
fn prepare_jar_provider<'a>(
&self,
jar: &'a mut NippyJar,
jar: &'a mut NippyJar<SegmentHeader>,
dictionaries: &'a mut Option<Vec<DecoderDictionary<'_>>>,
) -> eyre::Result<(SnapshotProvider<'a>, Vec<Decompressor<'a>>)> {
let mut decompressors: Vec<Decompressor<'_>> = vec![];
@ -140,6 +150,6 @@ impl Command {
}
}
Ok((SnapshotProvider { jar: &*jar, jar_start_block: self.from }, decompressors))
Ok((SnapshotProvider { jar: &*jar }, decompressors))
}
}

View File

@ -0,0 +1,176 @@
use super::{
bench::{bench, BenchKind},
Command, Compression, PerfectHashingFunction,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, table::Decompress};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
ChainSpec, Receipt, SnapshotSegment,
};
use reth_provider::{
DatabaseProviderRO, ProviderError, ProviderFactory, ReceiptProvider, TransactionsProvider,
TransactionsProviderExt,
};
use reth_snapshot::{
segments,
segments::{get_snapshot_segment_file_name, Segment},
};
use std::{path::Path, sync::Arc};
impl Command {
pub(crate) fn generate_receipts_snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let segment = segments::Receipts::new(
compression,
if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
},
);
segment.snapshot::<DB>(provider, self.from..=(self.from + self.block_interval - 1))?;
Ok(())
}
pub(crate) fn bench_receipts_snapshot(
&self,
db_path: &Path,
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let block_range = self.from..=(self.from + self.block_interval - 1);
let mut rng = rand::thread_rng();
let mut dictionaries = None;
let mut jar = NippyJar::load(&get_snapshot_segment_file_name(
SnapshotSegment::Receipts,
filters,
compression,
&block_range,
))?;
let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())
.provider()?
.transaction_range_by_block_range(block_range)?;
let mut row_indexes = tx_range.clone().collect::<Vec<_>>();
let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?;
let mut cursor = if !decompressors.is_empty() {
provider.cursor_with_decompressors(decompressors)
} else {
provider.cursor()
};
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
bench_kind,
(open_db_read_only(db_path, log_level)?, chain.clone()),
SnapshotSegment::Receipts,
filters,
compression,
|| {
for num in row_indexes.iter() {
Receipt::decompress(
cursor
.row_by_number_with_cols::<0b1, 1>(
(num - tx_range.start()) as usize,
)?
.ok_or(ProviderError::ReceiptNotFound((*num).into()))?[0],
)?;
// TODO: replace with below when eventually SnapshotProvider re-uses cursor
// provider.receipt(num as
// u64)?.ok_or(ProviderError::ReceiptNotFound((*num).into()))?;
}
Ok(())
},
|provider| {
for num in row_indexes.iter() {
provider
.receipt(*num)?
.ok_or(ProviderError::ReceiptNotFound((*num).into()))?;
}
Ok(())
},
)?;
// For random walk
row_indexes.shuffle(&mut rng);
}
// BENCHMARK QUERYING A RANDOM RECEIPT BY NUMBER
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())];
bench(
BenchKind::RandomOne,
(open_db_read_only(db_path, log_level)?, chain.clone()),
SnapshotSegment::Receipts,
filters,
compression,
|| {
Ok(Receipt::decompress(
cursor
.row_by_number_with_cols::<0b1, 1>((num - tx_range.start()) as usize)?
.ok_or(ProviderError::ReceiptNotFound((num as u64).into()))?[0],
)?)
},
|provider| {
Ok(provider
.receipt(num as u64)?
.ok_or(ProviderError::ReceiptNotFound((num as u64).into()))?)
},
)?;
}
// BENCHMARK QUERYING A RANDOM RECEIPT BY HASH
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64;
let tx_hash =
ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())
.transaction_by_id(num)?
.ok_or(ProviderError::ReceiptNotFound(num.into()))?
.hash();
bench(
BenchKind::RandomHash,
(open_db_read_only(db_path, log_level)?, chain.clone()),
SnapshotSegment::Receipts,
filters,
compression,
|| {
let receipt = Receipt::decompress(
cursor
.row_by_key_with_cols::<0b1, 1>(tx_hash.as_slice())?
.ok_or(ProviderError::ReceiptNotFound(tx_hash.into()))?[0],
)?;
Ok(receipt)
},
|provider| {
Ok(provider
.receipt_by_hash(tx_hash)?
.ok_or(ProviderError::ReceiptNotFound(tx_hash.into()))?)
},
)?;
}
Ok(())
}
}

View File

@ -0,0 +1,179 @@
use super::{
bench::{bench, BenchKind},
Command, Compression, PerfectHashingFunction,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, table::Decompress};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
ChainSpec, SnapshotSegment, TransactionSignedNoHash,
};
use reth_provider::{
DatabaseProviderRO, ProviderError, ProviderFactory, TransactionsProvider,
TransactionsProviderExt,
};
use reth_snapshot::{
segments,
segments::{get_snapshot_segment_file_name, Segment},
};
use std::{path::Path, sync::Arc};
impl Command {
pub(crate) fn generate_transactions_snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let segment = segments::Transactions::new(
compression,
if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
},
);
segment.snapshot::<DB>(provider, self.from..=(self.from + self.block_interval - 1))?;
Ok(())
}
pub(crate) fn bench_transactions_snapshot(
&self,
db_path: &Path,
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let block_range = self.from..=(self.from + self.block_interval - 1);
let mut rng = rand::thread_rng();
let mut dictionaries = None;
let mut jar = NippyJar::load(&get_snapshot_segment_file_name(
SnapshotSegment::Transactions,
filters,
compression,
&block_range,
))?;
let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())
.provider()?
.transaction_range_by_block_range(block_range)?;
let mut row_indexes = tx_range.clone().collect::<Vec<_>>();
let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?;
let mut cursor = if !decompressors.is_empty() {
provider.cursor_with_decompressors(decompressors)
} else {
provider.cursor()
};
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
bench_kind,
(open_db_read_only(db_path, log_level)?, chain.clone()),
SnapshotSegment::Transactions,
filters,
compression,
|| {
for num in row_indexes.iter() {
TransactionSignedNoHash::decompress(
cursor
.row_by_number_with_cols::<0b1, 1>(
(num - tx_range.start()) as usize,
)?
.ok_or(ProviderError::TransactionNotFound((*num).into()))?[0],
)?
.with_hash();
// TODO: replace with below when eventually SnapshotProvider re-uses cursor
// provider.transaction_by_id(num as
// u64)?.ok_or(ProviderError::TransactionNotFound((*num).into()))?;
}
Ok(())
},
|provider| {
for num in row_indexes.iter() {
provider
.transaction_by_id(*num)?
.ok_or(ProviderError::TransactionNotFound((*num).into()))?;
}
Ok(())
},
)?;
// For random walk
row_indexes.shuffle(&mut rng);
}
// BENCHMARK QUERYING A RANDOM TRANSACTION BY NUMBER
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())];
bench(
BenchKind::RandomOne,
(open_db_read_only(db_path, log_level)?, chain.clone()),
SnapshotSegment::Transactions,
filters,
compression,
|| {
Ok(TransactionSignedNoHash::decompress(
cursor
.row_by_number_with_cols::<0b1, 1>((num - tx_range.start()) as usize)?
.ok_or(ProviderError::TransactionNotFound((num as u64).into()))?[0],
)?
.with_hash())
},
|provider| {
Ok(provider
.transaction_by_id(num as u64)?
.ok_or(ProviderError::TransactionNotFound((num as u64).into()))?)
},
)?;
}
// BENCHMARK QUERYING A RANDOM TRANSACTION BY HASH
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64;
let transaction_hash =
ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())
.transaction_by_id(num)?
.ok_or(ProviderError::TransactionNotFound(num.into()))?
.hash();
bench(
BenchKind::RandomHash,
(open_db_read_only(db_path, log_level)?, chain.clone()),
SnapshotSegment::Transactions,
filters,
compression,
|| {
let transaction = TransactionSignedNoHash::decompress(
cursor
.row_by_key_with_cols::<0b1, 1>(transaction_hash.as_slice())?
.ok_or(ProviderError::TransactionNotFound(transaction_hash.into()))?[0],
)?;
// Might be a false positive, so in the real world we have to validate it
Ok(transaction.with_hash())
},
|provider| {
Ok(provider
.transaction_by_hash(transaction_hash)?
.ok_or(ProviderError::TransactionNotFound(transaction_hash.into()))?)
},
)?;
}
Ok(())
}
}

View File

@ -1,4 +1,6 @@
use reth_primitives::{Address, BlockHash, BlockHashOrNumber, BlockNumber, TxNumber, B256};
use reth_primitives::{
Address, BlockHash, BlockHashOrNumber, BlockNumber, TxHashOrNumber, TxNumber, B256,
};
/// Bundled errors variants thrown by various providers.
#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
@ -40,6 +42,12 @@ pub enum ProviderError {
/// when required header related data was not found but was required.
#[error("no header found for {0:?}")]
HeaderNotFound(BlockHashOrNumber),
/// The specific transaction is missing.
#[error("no transaction found for {0:?}")]
TransactionNotFound(TxHashOrNumber),
/// The specific receipt is missing
#[error("no receipt found for {0:?}")]
ReceiptNotFound(TxHashOrNumber),
/// Unable to find a specific block.
#[error("block does not exist {0:?}")]
BlockNotFound(BlockHashOrNumber),

View File

@ -91,7 +91,7 @@ pub use transaction::{
IntoRecoveredTransaction, InvalidTransactionError, PooledTransactionsElement,
PooledTransactionsElementEcRecovered, Signature, Transaction, TransactionKind, TransactionMeta,
TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash, TxEip1559, TxEip2930,
TxEip4844, TxLegacy, TxType, TxValue, EIP1559_TX_TYPE_ID, EIP2930_TX_TYPE_ID,
TxEip4844, TxHashOrNumber, TxLegacy, TxType, TxValue, EIP1559_TX_TYPE_ID, EIP2930_TX_TYPE_ID,
EIP4844_TX_TYPE_ID, LEGACY_TX_TYPE_ID,
};
pub use withdrawal::Withdrawal;

View File

@ -6,4 +6,4 @@ mod segment;
pub use compression::Compression;
pub use filters::{Filters, InclusionFilter, PerfectHashingFunction};
pub use segment::SnapshotSegment;
pub use segment::{SegmentHeader, SnapshotSegment};

View File

@ -1,4 +1,6 @@
use crate::{BlockNumber, TxNumber};
use serde::{Deserialize, Serialize};
use std::ops::RangeInclusive;
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, Serialize)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
@ -11,3 +13,30 @@ pub enum SnapshotSegment {
/// Snapshot segment responsible for the `Receipts` table.
Receipts,
}
/// A segment header that contains information common to all segments. Used for storage.
#[derive(Debug, Serialize, Deserialize)]
pub struct SegmentHeader {
block_range: RangeInclusive<BlockNumber>,
tx_range: RangeInclusive<TxNumber>,
}
impl SegmentHeader {
/// Returns [`SegmentHeader`].
pub fn new(
block_range: RangeInclusive<BlockNumber>,
tx_range: RangeInclusive<TxNumber>,
) -> Self {
Self { block_range, tx_range }
}
/// Returns the first block number of the segment.
pub fn block_start(&self) -> BlockNumber {
*self.block_range.start()
}
/// Returns the first transaction number of the segment.
pub fn tx_start(&self) -> TxNumber {
*self.tx_range.start()
}
}

View File

@ -1,6 +1,6 @@
use crate::{
compression::{TRANSACTION_COMPRESSOR, TRANSACTION_DECOMPRESSOR},
keccak256, Address, Bytes, TxHash, B256,
keccak256, Address, BlockHashOrNumber, Bytes, TxHash, B256,
};
use alloy_rlp::{
Decodable, Encodable, Error as RlpError, Header, EMPTY_LIST_CODE, EMPTY_STRING_CODE,
@ -1307,6 +1307,9 @@ impl IntoRecoveredTransaction for TransactionSignedEcRecovered {
}
}
/// Either a transaction hash or number.
pub type TxHashOrNumber = BlockHashOrNumber;
#[cfg(test)]
mod tests {
use crate::{

View File

@ -1,6 +1,6 @@
use crate::segments::{prepare_jar, Segment};
use crate::segments::{prepare_jar, Segment, SegmentHeader};
use reth_db::{
cursor::DbCursorRO, snapshot::create_snapshot_T1_T2_T3, table::Table, tables,
cursor::DbCursorRO, database::Database, snapshot::create_snapshot_T1_T2_T3, tables,
transaction::DbTx, RawKey, RawTable,
};
use reth_interfaces::RethResult;
@ -8,6 +8,7 @@ use reth_primitives::{
snapshot::{Compression, Filters},
BlockNumber, SnapshotSegment,
};
use reth_provider::DatabaseProviderRO;
use std::ops::RangeInclusive;
/// Snapshot segment responsible for [SnapshotSegment::Headers] part of data.
@ -22,28 +23,17 @@ impl Headers {
pub fn new(compression: Compression, filters: Filters) -> Self {
Self { compression, filters }
}
// Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000).
fn dataset_for_compression<T: Table<Key = BlockNumber>>(
&self,
tx: &impl DbTx,
range: &RangeInclusive<BlockNumber>,
range_len: usize,
) -> RethResult<Vec<Vec<u8>>> {
let mut cursor = tx.cursor_read::<RawTable<T>>()?;
Ok(cursor
.walk_back(Some(RawKey::from(*range.end())))?
.take(range_len.min(1000))
.map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist"))
.collect::<Vec<_>>())
}
}
impl Segment for Headers {
fn snapshot(&self, tx: &impl DbTx, range: RangeInclusive<BlockNumber>) -> RethResult<()> {
fn snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
range: RangeInclusive<BlockNumber>,
) -> RethResult<()> {
let range_len = range.clone().count();
let mut jar = prepare_jar::<3, tables::Headers>(
tx,
let mut jar = prepare_jar::<DB, 3>(
provider,
SnapshotSegment::Headers,
self.filters,
self.compression,
@ -51,17 +41,21 @@ impl Segment for Headers {
range_len,
|| {
Ok([
self.dataset_for_compression::<tables::Headers>(tx, &range, range_len)?,
self.dataset_for_compression::<tables::HeaderTD>(tx, &range, range_len)?,
self.dataset_for_compression::<tables::CanonicalHeaders>(
tx, &range, range_len,
self.dataset_for_compression::<DB, tables::Headers>(
provider, &range, range_len,
)?,
self.dataset_for_compression::<DB, tables::HeaderTD>(
provider, &range, range_len,
)?,
self.dataset_for_compression::<DB, tables::CanonicalHeaders>(
provider, &range, range_len,
)?,
])
},
)?;
// Generate list of hashes for filters & PHF
let mut cursor = tx.cursor_read::<RawTable<tables::CanonicalHeaders>>()?;
let mut cursor = provider.tx_ref().cursor_read::<RawTable<tables::CanonicalHeaders>>()?;
let mut hashes = None;
if self.filters.has_filters() {
hashes = Some(
@ -77,8 +71,9 @@ impl Segment for Headers {
tables::HeaderTD,
tables::CanonicalHeaders,
BlockNumber,
SegmentHeader,
>(
tx,
provider.tx_ref(),
range,
None,
// We already prepared the dictionary beforehand

View File

@ -1,16 +1,24 @@
//! Snapshot segment implementations and utilities.
mod headers;
mod transactions;
pub use transactions::Transactions;
mod headers;
pub use headers::Headers;
use reth_db::{table::Table, transaction::DbTx};
mod receipts;
pub use receipts::Receipts;
use reth_db::{
cursor::DbCursorRO, database::Database, table::Table, transaction::DbTx, RawKey, RawTable,
};
use reth_interfaces::RethResult;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentHeader},
BlockNumber, SnapshotSegment,
};
use reth_provider::{DatabaseProviderRO, TransactionsProviderExt};
use std::{ops::RangeInclusive, path::PathBuf};
pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];
@ -18,22 +26,43 @@ pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];
/// A segment represents a snapshotting of some portion of the data.
pub trait Segment {
/// Snapshot data using the provided range.
fn snapshot(&self, tx: &impl DbTx, range: RangeInclusive<BlockNumber>) -> RethResult<()>;
fn snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
range: RangeInclusive<BlockNumber>,
) -> RethResult<()>;
/// Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000).
fn dataset_for_compression<DB: Database, T: Table<Key = u64>>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
range: &RangeInclusive<u64>,
range_len: usize,
) -> RethResult<Vec<Vec<u8>>> {
let mut cursor = provider.tx_ref().cursor_read::<RawTable<T>>()?;
Ok(cursor
.walk_back(Some(RawKey::from(*range.end())))?
.take(range_len.min(1000))
.map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist"))
.collect::<Vec<_>>())
}
}
/// Returns a [`NippyJar`] according to the desired configuration.
pub(crate) fn prepare_jar<const COLUMNS: usize, T: Table>(
tx: &impl DbTx,
pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
provider: &DatabaseProviderRO<'_, DB>,
segment: SnapshotSegment,
filters: Filters,
compression: Compression,
range: RangeInclusive<BlockNumber>,
range_len: usize,
block_range: RangeInclusive<BlockNumber>,
total_rows: usize,
prepare_compression: impl Fn() -> RethResult<Rows<COLUMNS>>,
) -> RethResult<NippyJar> {
let mut nippy_jar = NippyJar::new_without_header(
) -> RethResult<NippyJar<SegmentHeader>> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let mut nippy_jar = NippyJar::new(
COLUMNS,
&get_snapshot_segment_file_name(segment, filters, compression, &range),
&get_snapshot_segment_file_name(segment, filters, compression, &block_range),
SegmentHeader::new(block_range, tx_range),
);
nippy_jar = match compression {
@ -50,7 +79,6 @@ pub(crate) fn prepare_jar<const COLUMNS: usize, T: Table>(
};
if let Filters::WithFilters(inclusion_filter, phf) = filters {
let total_rows = (tx.entries::<T>()? - *range.start() as usize).min(range_len);
nippy_jar = match inclusion_filter {
InclusionFilter::Cuckoo => nippy_jar.with_cuckoo_filter(total_rows),
};

View File

@ -0,0 +1,74 @@
use crate::segments::{prepare_jar, Segment};
use reth_db::{database::Database, snapshot::create_snapshot_T1, tables};
use reth_interfaces::RethResult;
use reth_primitives::{
snapshot::{Compression, Filters, SegmentHeader},
BlockNumber, SnapshotSegment, TxNumber,
};
use reth_provider::{DatabaseProviderRO, TransactionsProviderExt};
use std::ops::RangeInclusive;
/// Snapshot segment responsible for [SnapshotSegment::Receipts] part of data.
#[derive(Debug)]
pub struct Receipts {
compression: Compression,
filters: Filters,
}
impl Receipts {
/// Creates new instance of [Receipts] snapshot segment.
pub fn new(compression: Compression, filters: Filters) -> Self {
Self { compression, filters }
}
}
impl Segment for Receipts {
fn snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
block_range: RangeInclusive<BlockNumber>,
) -> RethResult<()> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let tx_range_len = tx_range.clone().count();
let mut jar = prepare_jar::<DB, 1>(
provider,
SnapshotSegment::Receipts,
self.filters,
self.compression,
block_range,
tx_range_len,
|| {
Ok([self.dataset_for_compression::<DB, tables::Receipts>(
provider,
&tx_range,
tx_range_len,
)?])
},
)?;
// Generate list of hashes for filters & PHF
let mut hashes = None;
if self.filters.has_filters() {
hashes = Some(
provider
.transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))?
.into_iter()
.map(|(tx, _)| Ok(tx)),
);
}
create_snapshot_T1::<tables::Receipts, TxNumber, SegmentHeader>(
provider.tx_ref(),
tx_range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
tx_range_len,
&mut jar,
)?;
Ok(())
}
}

View File

@ -0,0 +1,74 @@
use crate::segments::{prepare_jar, Segment};
use reth_db::{database::Database, snapshot::create_snapshot_T1, tables};
use reth_interfaces::RethResult;
use reth_primitives::{
snapshot::{Compression, Filters, SegmentHeader},
BlockNumber, SnapshotSegment, TxNumber,
};
use reth_provider::{DatabaseProviderRO, TransactionsProviderExt};
use std::ops::RangeInclusive;
/// Snapshot segment responsible for [SnapshotSegment::Transactions] part of data.
#[derive(Debug)]
pub struct Transactions {
compression: Compression,
filters: Filters,
}
impl Transactions {
/// Creates new instance of [Transactions] snapshot segment.
pub fn new(compression: Compression, filters: Filters) -> Self {
Self { compression, filters }
}
}
impl Segment for Transactions {
fn snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
block_range: RangeInclusive<BlockNumber>,
) -> RethResult<()> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let tx_range_len = tx_range.clone().count();
let mut jar = prepare_jar::<DB, 1>(
provider,
SnapshotSegment::Transactions,
self.filters,
self.compression,
block_range,
tx_range_len,
|| {
Ok([self.dataset_for_compression::<DB, tables::Transactions>(
provider,
&tx_range,
tx_range_len,
)?])
},
)?;
// Generate list of hashes for filters & PHF
let mut hashes = None;
if self.filters.has_filters() {
hashes = Some(
provider
.transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))?
.into_iter()
.map(|(tx, _)| Ok(tx)),
);
}
create_snapshot_T1::<tables::Transactions, TxNumber, SegmentHeader>(
provider.tx_ref(),
tx_range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
tx_range_len,
&mut jar,
)?;
Ok(())
}
}

View File

@ -1,23 +1,20 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use itertools::Itertools;
use rayon::prelude::*;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
tables,
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use reth_interfaces::provider::ProviderError;
use reth_primitives::{
keccak256,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
PruneCheckpoint, PruneMode, PruneSegment, TransactionSignedNoHash, TxNumber, B256,
PruneCheckpoint, PruneMode, PruneSegment,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter,
TransactionsProviderExt,
};
use tokio::sync::mpsc;
use tracing::*;
/// The transaction lookup stage.
@ -93,49 +90,15 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
let (tx_range, block_range, is_final_range) =
input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?;
let end_block = *block_range.end();
let tx_range_size = tx_range.clone().count();
debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Updating transaction lookup");
let tx = provider.tx_ref();
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
let tx_walker = tx_cursor.walk_range(tx_range)?;
let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
let mut channels = Vec::with_capacity(chunk_size);
let mut transaction_count = 0;
for chunk in &tx_walker.chunks(chunk_size) {
let (tx, rx) = mpsc::unbounded_channel();
channels.push(rx);
// Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send)
let chunk: Vec<_> = chunk.collect();
transaction_count += chunk.len();
// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has calculated the hash.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for entry in chunk {
rlp_buf.clear();
let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
}
});
}
let mut tx_list = Vec::with_capacity(transaction_count);
// Iterate over channels and append the tx hashes to be sorted out later
for mut channel in channels {
while let Some(tx) = channel.recv().await {
let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
tx_list.push((tx_hash, tx_id));
}
}
let mut tx_list = provider.transaction_hashes_by_range(tx_range)?;
// Sort before inserting the reverse lookup for hash -> tx_id.
tx_list.par_sort_unstable_by(|txa, txb| txa.0.cmp(&txb.0));
let tx = provider.tx_ref();
let mut txhash_cursor = tx.cursor_write::<tables::TxHashNumber>()?;
// If the last inserted element in the database is equal or bigger than the first
@ -201,17 +164,6 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
}
/// Calculates the hash of the given transaction
#[inline]
fn calculate_hash(
entry: Result<(TxNumber, TransactionSignedNoHash), DatabaseError>,
rlp_buf: &mut Vec<u8>,
) -> Result<(B256, TxNumber), Box<StageError>> {
let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
tx.transaction.encode_with_signature(&tx.signature, rlp_buf, false);
Ok((keccak256(rlp_buf), tx_id))
}
fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<'_, &DB>,
) -> Result<EntitiesCheckpoint, StageError> {

View File

@ -45,6 +45,8 @@ parking_lot.workspace = true
derive_more = "0.99"
eyre.workspace = true
paste = "1.0"
rayon.workspace = true
itertools.workspace = true
# arbitrary utils
arbitrary = { workspace = true, features = ["derive"], optional = true }

View File

@ -9,6 +9,7 @@ use crate::{
use reth_interfaces::RethResult;
use reth_nippy_jar::{ColumnResult, NippyJar, PHFKey};
use reth_tracing::tracing::*;
use serde::{Deserialize, Serialize};
use std::{error::Error as StdError, ops::RangeInclusive};
/// Macro that generates snapshot creation functions that take an arbitratry number of [`Table`] and
@ -34,7 +35,8 @@ macro_rules! generate_snapshot_func {
#[allow(non_snake_case)]
pub fn [<create_snapshot$(_ $tbl)+>]<
$($tbl: Table<Key=K>,)+
K
K,
H: for<'a> Deserialize<'a> + Send + Serialize + Sync + std::fmt::Debug
>
(
tx: &impl DbTx,
@ -43,7 +45,7 @@ macro_rules! generate_snapshot_func {
dict_compression_set: Option<Vec<impl Iterator<Item = Vec<u8>>>>,
keys: Option<impl Iterator<Item = ColumnResult<impl PHFKey>>>,
row_count: usize,
nippy_jar: &mut NippyJar
nippy_jar: &mut NippyJar<H>
) -> RethResult<()>
where K: Key + Copy
{

View File

@ -25,7 +25,7 @@ pub use traits::{
PruneCheckpointWriter, ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader,
StageCheckpointWriter, StateProvider, StateProviderBox, StateProviderFactory,
StateRootProvider, StorageReader, TransactionVariant, TransactionsProvider,
WithdrawalsProvider,
TransactionsProviderExt, WithdrawalsProvider,
};
/// Provider trait implementations.

View File

@ -6,7 +6,8 @@ use crate::{
AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter,
Chain, EvmEnvProvider, HashingWriter, HeaderProvider, HistoryWriter, OriginalValuesKnown,
ProviderError, PruneCheckpointReader, PruneCheckpointWriter, StageCheckpointReader,
StorageReader, TransactionVariant, TransactionsProvider, WithdrawalsProvider,
StorageReader, TransactionVariant, TransactionsProvider, TransactionsProviderExt,
WithdrawalsProvider,
};
use itertools::{izip, Itertools};
use reth_db::{
@ -24,7 +25,7 @@ use reth_db::{
};
use reth_interfaces::{
executor::{BlockExecutionError, BlockValidationError},
RethResult,
RethError, RethResult,
};
use reth_primitives::{
keccak256,
@ -46,7 +47,7 @@ use std::{
collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
fmt::Debug,
ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
sync::Arc,
sync::{mpsc, Arc},
};
/// A [`DatabaseProvider`] that holds a read-only database transaction.
@ -1140,6 +1141,65 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
}
}
impl<TX: DbTx> TransactionsProviderExt for DatabaseProvider<TX> {
/// Recovers transaction hashes by walking through `Transactions` table and
/// calculating them in a parallel manner. Returned unsorted.
fn transaction_hashes_by_range(
&self,
tx_range: Range<TxNumber>,
) -> RethResult<Vec<(TxHash, TxNumber)>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
let tx_range_size = tx_range.clone().count();
let tx_walker = tx_cursor.walk_range(tx_range)?;
let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
let mut channels = Vec::with_capacity(chunk_size);
let mut transaction_count = 0;
#[inline]
fn calculate_hash(
entry: Result<(TxNumber, TransactionSignedNoHash), DatabaseError>,
rlp_buf: &mut Vec<u8>,
) -> Result<(B256, TxNumber), Box<RethError>> {
let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
tx.transaction.encode_with_signature(&tx.signature, rlp_buf, false);
Ok((keccak256(rlp_buf), tx_id))
}
for chunk in &tx_walker.chunks(chunk_size) {
let (tx, rx) = mpsc::channel();
channels.push(rx);
// Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send)
let chunk: Vec<_> = chunk.collect();
transaction_count += chunk.len();
// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has calculated the hash.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for entry in chunk {
rlp_buf.clear();
let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
}
});
}
let mut tx_list = Vec::with_capacity(transaction_count);
// Iterate over channels and append the tx hashes unsorted
for channel in channels {
while let Ok(tx) = channel.recv() {
let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
tx_list.push((tx_hash, tx_id));
}
}
Ok(tx_list)
}
}
/// Calculates the hash of the given transaction
impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
fn transaction_id(&self, tx_hash: TxHash) -> RethResult<Option<TxNumber>> {
Ok(self.tx.get::<tables::TxHashNumber>(tx_hash)?)

View File

@ -1,11 +1,15 @@
use crate::HeaderProvider;
use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider};
use reth_db::{
table::{Decompress, Table},
HeaderTD,
};
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_nippy_jar::{compression::Decompressor, NippyJar, NippyJarCursor};
use reth_primitives::{BlockHash, BlockNumber, Header, SealedHeader, U256};
use reth_primitives::{
snapshot::SegmentHeader, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header,
SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber,
B256, U256,
};
use std::ops::RangeBounds;
/// SnapshotProvider
@ -16,14 +20,12 @@ use std::ops::RangeBounds;
#[derive(Debug)]
pub struct SnapshotProvider<'a> {
/// NippyJar
pub jar: &'a NippyJar,
/// Starting snapshot block
pub jar_start_block: u64,
pub jar: &'a NippyJar<SegmentHeader>,
}
impl<'a> SnapshotProvider<'a> {
/// Creates cursor
pub fn cursor(&self) -> NippyJarCursor<'a> {
pub fn cursor(&self) -> NippyJarCursor<'a, SegmentHeader> {
NippyJarCursor::new(self.jar, None).unwrap()
}
@ -31,7 +33,7 @@ impl<'a> SnapshotProvider<'a> {
pub fn cursor_with_decompressors(
&self,
decompressors: Vec<Decompressor<'a>>,
) -> NippyJarCursor<'a> {
) -> NippyJarCursor<'a, SegmentHeader> {
NippyJarCursor::new(self.jar, Some(decompressors)).unwrap()
}
}
@ -57,7 +59,9 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> {
fn header_by_number(&self, num: BlockNumber) -> RethResult<Option<Header>> {
Header::decompress(
self.cursor()
.row_by_number_with_cols::<0b01, 2>((num - self.jar_start_block) as usize)?
.row_by_number_with_cols::<0b01, 2>(
(num - self.jar.user_header().block_start()) as usize,
)?
.ok_or(ProviderError::HeaderNotFound(num.into()))?[0],
)
.map(Some)
@ -101,6 +105,122 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> {
}
}
impl<'a> BlockHashReader for SnapshotProvider<'a> {
fn block_hash(&self, _number: u64) -> RethResult<Option<B256>> {
todo!()
}
fn canonical_hashes_range(
&self,
_start: BlockNumber,
_end: BlockNumber,
) -> RethResult<Vec<B256>> {
todo!()
}
}
impl<'a> BlockNumReader for SnapshotProvider<'a> {
fn chain_info(&self) -> RethResult<ChainInfo> {
todo!()
}
fn best_block_number(&self) -> RethResult<BlockNumber> {
todo!()
}
fn last_block_number(&self) -> RethResult<BlockNumber> {
todo!()
}
fn block_number(&self, _hash: B256) -> RethResult<Option<BlockNumber>> {
todo!()
}
}
impl<'a> TransactionsProvider for SnapshotProvider<'a> {
fn transaction_id(&self, _tx_hash: TxHash) -> RethResult<Option<TxNumber>> {
todo!()
}
fn transaction_by_id(&self, num: TxNumber) -> RethResult<Option<TransactionSigned>> {
TransactionSignedNoHash::decompress(
self.cursor()
.row_by_number_with_cols::<0b1, 1>(
(num - self.jar.user_header().tx_start()) as usize,
)?
.ok_or(ProviderError::TransactionNotFound(num.into()))?[0],
)
.map(Into::into)
.map(Some)
.map_err(Into::into)
}
fn transaction_by_id_no_hash(
&self,
_id: TxNumber,
) -> RethResult<Option<TransactionSignedNoHash>> {
todo!()
}
fn transaction_by_hash(&self, hash: TxHash) -> RethResult<Option<TransactionSigned>> {
// WIP
let mut cursor = self.cursor();
let tx = TransactionSignedNoHash::decompress(
cursor.row_by_key_with_cols::<0b1, 1>(&hash.0).unwrap().unwrap()[0],
)
.unwrap()
.with_hash();
if tx.hash() == hash {
return Ok(Some(tx))
} else {
// check next snapshot
}
Ok(None)
}
fn transaction_by_hash_with_meta(
&self,
_hash: TxHash,
) -> RethResult<Option<(TransactionSigned, TransactionMeta)>> {
todo!()
}
fn transaction_block(&self, _id: TxNumber) -> RethResult<Option<BlockNumber>> {
todo!()
}
fn transactions_by_block(
&self,
_block_id: BlockHashOrNumber,
) -> RethResult<Option<Vec<TransactionSigned>>> {
todo!()
}
fn transactions_by_block_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>> {
todo!()
}
fn senders_by_tx_range(&self, _range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
todo!()
}
fn transactions_by_tx_range(
&self,
_range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<reth_primitives::TransactionSignedNoHash>> {
todo!()
}
fn transaction_sender(&self, _id: TxNumber) -> RethResult<Option<Address>> {
todo!()
}
}
#[cfg(test)]
mod test {
use super::*;
@ -123,6 +243,7 @@ mod test {
// Ranges
let row_count = 100u64;
let range = 0..=(row_count - 1);
let segment_header = SegmentHeader::new(range.clone(), range.clone());
// Data sources
let db = create_test_rw_db();
@ -157,7 +278,7 @@ mod test {
let with_compression = true;
let with_filter = true;
let mut nippy_jar = NippyJar::new_without_header(2, snap_file.path());
let mut nippy_jar = NippyJar::new(2, snap_file.path(), segment_header);
if with_compression {
nippy_jar = nippy_jar.with_zstd(false, 0);
@ -180,7 +301,7 @@ mod test {
.unwrap()
.map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into()));
create_snapshot_T1_T2::<Headers, HeaderTD, BlockNumber>(
create_snapshot_T1_T2::<Headers, HeaderTD, BlockNumber, SegmentHeader>(
&tx,
range,
None,
@ -194,10 +315,10 @@ mod test {
// Use providers to query Header data and compare if it matches
{
let jar = NippyJar::load_without_header(snap_file.path()).unwrap();
let jar = NippyJar::load(snap_file.path()).unwrap();
let db_provider = factory.provider().unwrap();
let snap_provider = SnapshotProvider { jar: &jar, jar_start_block: 0 };
let snap_provider = SnapshotProvider { jar: &jar };
assert!(!headers.is_empty());

View File

@ -37,7 +37,7 @@ pub use state::{
};
mod transactions;
pub use transactions::TransactionsProvider;
pub use transactions::{TransactionsProvider, TransactionsProviderExt};
mod withdrawals;
pub use withdrawals::WithdrawalsProvider;

View File

@ -1,10 +1,10 @@
use crate::BlockNumReader;
use reth_interfaces::RethResult;
use crate::{BlockNumReader, BlockReader};
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_primitives::{
Address, BlockHashOrNumber, BlockNumber, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, TxHash, TxNumber,
};
use std::ops::RangeBounds;
use std::ops::{Range, RangeBounds, RangeInclusive};
/// Client trait for fetching [TransactionSigned] related data.
#[auto_impl::auto_impl(&, Arc)]
@ -63,3 +63,31 @@ pub trait TransactionsProvider: BlockNumReader + Send + Sync {
/// Returns None if the transaction is not found.
fn transaction_sender(&self, id: TxNumber) -> RethResult<Option<Address>>;
}
/// Client trait for fetching additional [TransactionSigned] related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait TransactionsProviderExt: BlockReader + Send + Sync {
/// Get transactions range by block range.
fn transaction_range_by_block_range(
&self,
block_range: RangeInclusive<BlockNumber>,
) -> RethResult<RangeInclusive<TxNumber>> {
let from = self
.block_body_indices(*block_range.start())?
.ok_or(ProviderError::BlockBodyIndicesNotFound(*block_range.start()))?
.first_tx_num();
let to = self
.block_body_indices(*block_range.end())?
.ok_or(ProviderError::BlockBodyIndicesNotFound(*block_range.end()))?
.last_tx_num();
Ok(from..=to)
}
/// Get transaction hashes from a transaction range.
fn transaction_hashes_by_range(
&self,
tx_range: Range<TxNumber>,
) -> RethResult<Vec<(TxHash, TxNumber)>>;
}