chore: remove reth db create-static-files command (#8554)

This commit is contained in:
joshieDo
2024-06-03 14:56:54 +02:00
committed by GitHub
parent db3d1335a8
commit 34af610b8e
10 changed files with 10 additions and 917 deletions

View File

@ -25,7 +25,6 @@ mod clear;
mod diff;
mod get;
mod list;
mod static_files;
mod stats;
/// DB List TUI
mod tui;
@ -84,8 +83,6 @@ pub enum Subcommands {
},
/// Deletes all table entries
Clear(clear::Command),
/// Creates static files from database tables
CreateStaticFiles(static_files::Command),
/// Lists current and local database versions
Version,
/// Returns the full database path
@ -176,9 +173,6 @@ impl Command {
command.execute(provider_factory)?;
}
Subcommands::CreateStaticFiles(command) => {
command.execute(data_dir, self.db.database_args(), self.chain.clone())?;
}
Subcommands::Version => {
let local_db_version = match get_db_version(&db_path) {
Ok(version) => Some(version),

View File

@ -1,54 +0,0 @@
use reth_db::DatabaseEnv;
use reth_primitives::{
static_file::{Compression, Filters},
StaticFileSegment,
};
use reth_provider::{DatabaseProviderRO, ProviderFactory};
use std::{fmt::Debug, sync::Arc, time::Instant};
#[derive(Debug)]
pub(crate) enum BenchKind {
Walk,
RandomAll,
RandomOne,
RandomHash,
}
pub(crate) fn bench<F1, F2, R>(
bench_kind: BenchKind,
provider_factory: Arc<ProviderFactory<DatabaseEnv>>,
segment: StaticFileSegment,
filters: Filters,
compression: Compression,
mut static_file_method: F1,
database_method: F2,
) -> eyre::Result<()>
where
F1: FnMut() -> eyre::Result<R>,
F2: Fn(DatabaseProviderRO<DatabaseEnv>) -> eyre::Result<R>,
R: Debug + PartialEq,
{
println!();
println!("############");
println!("## [{segment:?}] [{compression:?}] [{filters:?}] [{bench_kind:?}]");
let static_file_result = {
let start = Instant::now();
let result = static_file_method()?;
let end = start.elapsed().as_micros();
println!("# static file {bench_kind:?} | {end} μs");
result
};
let db_result = {
let provider = provider_factory.provider()?;
let start = Instant::now();
let result = database_method(provider)?;
let end = start.elapsed().as_micros();
println!("# database {bench_kind:?} | {end} μs");
result
};
assert_eq!(static_file_result, db_result);
Ok(())
}

View File

@ -1,133 +0,0 @@
use super::{
bench::{bench, BenchKind},
Command,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{static_file::HeaderMask, DatabaseEnv};
use reth_primitives::{
static_file::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
BlockHash, Header, StaticFileSegment,
};
use reth_provider::{
providers::StaticFileProvider, BlockNumReader, HeaderProvider, ProviderError, ProviderFactory,
};
use std::{ops::RangeInclusive, path::PathBuf, sync::Arc};
impl Command {
pub(crate) fn bench_headers_static_file(
&self,
provider_factory: Arc<ProviderFactory<DatabaseEnv>>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: Option<PerfectHashingFunction>,
) -> eyre::Result<()> {
let provider = provider_factory.provider()?;
let tip = provider.last_block_number()?;
let block_range = *self.block_ranges(tip).first().expect("has been generated before");
let filters = if let Some(phf) = self.with_filters.then_some(phf).flatten() {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let range: RangeInclusive<u64> = (&block_range).into();
let mut row_indexes = range.collect::<Vec<_>>();
let mut rng = rand::thread_rng();
let path: PathBuf = StaticFileSegment::Headers
.filename_with_configuration(filters, compression, &block_range)
.into();
let provider = StaticFileProvider::read_only(PathBuf::default())?;
let jar_provider = provider.get_segment_provider_from_block(
StaticFileSegment::Headers,
self.from,
Some(&path),
)?;
let mut cursor = jar_provider.cursor()?;
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
bench_kind,
provider_factory.clone(),
StaticFileSegment::Headers,
filters,
compression,
|| {
for num in row_indexes.iter() {
cursor
.get_one::<HeaderMask<Header>>((*num).into())?
.ok_or(ProviderError::HeaderNotFound((*num).into()))?;
}
Ok(())
},
|provider| {
for num in row_indexes.iter() {
provider
.header_by_number(*num)?
.ok_or(ProviderError::HeaderNotFound((*num).into()))?;
}
Ok(())
},
)?;
// For random walk
row_indexes.shuffle(&mut rng);
}
// BENCHMARK QUERYING A RANDOM HEADER BY NUMBER
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())];
bench(
BenchKind::RandomOne,
provider_factory.clone(),
StaticFileSegment::Headers,
filters,
compression,
|| {
Ok(cursor
.get_one::<HeaderMask<Header>>(num.into())?
.ok_or(ProviderError::HeaderNotFound(num.into()))?)
},
|provider| {
Ok(provider
.header_by_number(num)?
.ok_or(ProviderError::HeaderNotFound((num).into()))?)
},
)?;
}
// BENCHMARK QUERYING A RANDOM HEADER BY HASH
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())];
let header_hash = provider_factory
.header_by_number(num)?
.ok_or(ProviderError::HeaderNotFound(num.into()))?
.hash_slow();
bench(
BenchKind::RandomHash,
provider_factory,
StaticFileSegment::Headers,
filters,
compression,
|| {
let (header, hash) = cursor
.get_two::<HeaderMask<Header, BlockHash>>((&header_hash).into())?
.ok_or(ProviderError::HeaderNotFound(header_hash.into()))?;
// Might be a false positive, so in the real world we have to validate it
assert_eq!(hash, header_hash);
Ok(header)
},
|provider| {
Ok(provider
.header(&header_hash)?
.ok_or(ProviderError::HeaderNotFound(header_hash.into()))?)
},
)?;
}
Ok(())
}
}

View File

@ -1,267 +0,0 @@
use clap::{builder::RangedU64ValueParser, Parser};
use human_bytes::human_bytes;
use itertools::Itertools;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use reth_db::{
database::Database,
mdbx::{DatabaseArguments, MaxReadTransactionDuration},
open_db_read_only, DatabaseEnv,
};
use reth_nippy_jar::{NippyJar, NippyJarCursor};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_primitives::{
static_file::{
Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentConfig,
SegmentHeader, SegmentRangeInclusive,
},
BlockNumber, ChainSpec, StaticFileSegment,
};
use reth_provider::{providers::StaticFileProvider, BlockNumReader, ProviderFactory};
use reth_static_file::{segments as static_file_segments, segments::Segment};
use std::{
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};
mod bench;
mod headers;
mod receipts;
mod transactions;
#[derive(Parser, Debug)]
/// Arguments for the `reth db create-static-files` command.
pub struct Command {
/// Static File segments to generate.
segments: Vec<StaticFileSegment>,
/// Starting block for the static file.
#[arg(long, short, default_value = "0")]
from: BlockNumber,
/// Number of blocks in the static file.
#[arg(long, short, default_value = "500000")]
block_interval: u64,
/// Sets the number of static files built in parallel. Note: Each parallel build is
/// memory-intensive.
#[arg(
long, short,
default_value = "1",
value_parser = RangedU64ValueParser::<u64>::new().range(1..)
)]
parallel: u64,
/// Flag to skip static file creation and print static files stats.
#[arg(long, default_value = "false")]
only_stats: bool,
/// Flag to enable database-to-static file benchmarking.
#[arg(long, default_value = "false")]
bench: bool,
/// Flag to skip static file creation and only run benchmarks on existing static files.
#[arg(long, default_value = "false")]
only_bench: bool,
/// Compression algorithms to use.
#[arg(long, short, value_delimiter = ',', default_value = "uncompressed")]
compression: Vec<Compression>,
/// Flag to enable inclusion list filters and PHFs.
#[arg(long, default_value = "false")]
with_filters: bool,
/// Specifies the perfect hashing function to use.
#[arg(long, value_delimiter = ',', default_value_if("with_filters", "true", "fmph"))]
phf: Vec<PerfectHashingFunction>,
}
impl Command {
/// Execute `db create-static-files` command
pub fn execute(
self,
data_dir: ChainPath<DataDirPath>,
db_args: DatabaseArguments,
chain: Arc<ChainSpec>,
) -> eyre::Result<()> {
let all_combinations = self
.segments
.iter()
.cartesian_product(self.compression.iter().copied())
.cartesian_product(if self.phf.is_empty() {
vec![None]
} else {
self.phf.iter().copied().map(Some).collect::<Vec<_>>()
});
let db = open_db_read_only(
data_dir.db().as_path(),
db_args.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
)?;
let provider_factory = Arc::new(ProviderFactory::new(
db,
chain,
StaticFileProvider::read_write(data_dir.static_files())?,
));
{
if !self.only_bench {
for ((mode, compression), phf) in all_combinations.clone() {
let filters = if let Some(phf) = self.with_filters.then_some(phf).flatten() {
Filters::WithFilters(InclusionFilter::Cuckoo, phf)
} else {
Filters::WithoutFilters
};
match mode {
StaticFileSegment::Headers => self.generate_static_file::<DatabaseEnv>(
provider_factory.clone(),
static_file_segments::Headers,
SegmentConfig { filters, compression },
)?,
StaticFileSegment::Transactions => self
.generate_static_file::<DatabaseEnv>(
provider_factory.clone(),
static_file_segments::Transactions,
SegmentConfig { filters, compression },
)?,
StaticFileSegment::Receipts => self.generate_static_file::<DatabaseEnv>(
provider_factory.clone(),
static_file_segments::Receipts,
SegmentConfig { filters, compression },
)?,
}
}
}
}
if self.only_bench || self.bench {
for ((mode, compression), phf) in all_combinations {
match mode {
StaticFileSegment::Headers => self.bench_headers_static_file(
provider_factory.clone(),
compression,
InclusionFilter::Cuckoo,
phf,
)?,
StaticFileSegment::Transactions => self.bench_transactions_static_file(
provider_factory.clone(),
compression,
InclusionFilter::Cuckoo,
phf,
)?,
StaticFileSegment::Receipts => self.bench_receipts_static_file(
provider_factory.clone(),
compression,
InclusionFilter::Cuckoo,
phf,
)?,
}
}
}
Ok(())
}
/// Generates successive inclusive block ranges up to the tip starting at `self.from`.
fn block_ranges(&self, tip: BlockNumber) -> Vec<SegmentRangeInclusive> {
let mut from = self.from;
let mut ranges = Vec::new();
while from <= tip {
let end_range = std::cmp::min(from + self.block_interval - 1, tip);
ranges.push(SegmentRangeInclusive::new(from, end_range));
from = end_range + 1;
}
ranges
}
/// Generates static files from `self.from` with a `self.block_interval`. Generates them in
/// parallel if specified.
fn generate_static_file<DB: Database>(
&self,
factory: Arc<ProviderFactory<DB>>,
segment: impl Segment<DB>,
config: SegmentConfig,
) -> eyre::Result<()> {
let dir = PathBuf::default();
let ranges = self.block_ranges(factory.best_block_number()?);
let mut created_static_files = vec![];
// Filter/PHF is memory intensive, so we have to limit the parallelism.
for block_ranges in ranges.chunks(self.parallel as usize) {
let created_files = block_ranges
.into_par_iter()
.map(|block_range| {
let provider = factory.provider()?;
if !self.only_stats {
segment.create_static_file_file(
&provider,
dir.as_path(),
config,
block_range.into(),
)?;
}
Ok(segment.segment().filename(block_range))
})
.collect::<Result<Vec<_>, eyre::Report>>()?;
created_static_files.extend(created_files);
}
self.stats(created_static_files)
}
/// Prints detailed statistics for each static file, including loading time.
///
/// This function loads each static file from the provided paths and prints
/// statistics about various aspects of each static file, such as filters size,
/// offset index size, offset list size, and loading time.
fn stats(&self, static_files: Vec<impl AsRef<Path>>) -> eyre::Result<()> {
let mut total_filters_size = 0;
let mut total_index_size = 0;
let mut total_duration = Duration::new(0, 0);
let mut total_file_size = 0;
for snap in &static_files {
let start_time = Instant::now();
let jar = NippyJar::<SegmentHeader>::load(snap.as_ref())?;
let _cursor = NippyJarCursor::new(&jar)?;
let duration = start_time.elapsed();
let file_size = snap.as_ref().metadata()?.len();
total_filters_size += jar.filter_size();
total_index_size += jar.offsets_index_size();
total_duration += duration;
total_file_size += file_size;
println!("StaticFile: {:?}", snap.as_ref().file_name());
println!(" File Size: {:>7}", human_bytes(file_size as f64));
println!(" Filters Size: {:>7}", human_bytes(jar.filter_size() as f64));
println!(" Offset Index Size: {:>7}", human_bytes(jar.offsets_index_size() as f64));
println!(
" Loading Time: {:>7.2} ms | {:>7.2} µs",
duration.as_millis() as f64,
duration.as_micros() as f64
);
}
let avg_duration = total_duration / static_files.len() as u32;
println!("Total Filters Size: {:>7}", human_bytes(total_filters_size as f64));
println!("Total Offset Index Size: {:>7}", human_bytes(total_index_size as f64));
println!("Total File Size: {:>7}", human_bytes(total_file_size as f64));
println!(
"Average Loading Time: {:>7.2} ms | {:>7.2} µs",
avg_duration.as_millis() as f64,
avg_duration.as_micros() as f64
);
Ok(())
}
}

View File

@ -1,133 +0,0 @@
use super::{
bench::{bench, BenchKind},
Command, Compression, PerfectHashingFunction,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{static_file::ReceiptMask, DatabaseEnv};
use reth_primitives::{
static_file::{Filters, InclusionFilter},
Receipt, StaticFileSegment,
};
use reth_provider::{
providers::StaticFileProvider, BlockNumReader, ProviderError, ProviderFactory, ReceiptProvider,
TransactionsProvider, TransactionsProviderExt,
};
use std::{path::PathBuf, sync::Arc};
impl Command {
pub(crate) fn bench_receipts_static_file(
&self,
provider_factory: Arc<ProviderFactory<DatabaseEnv>>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: Option<PerfectHashingFunction>,
) -> eyre::Result<()> {
let provider = provider_factory.provider()?;
let tip = provider.last_block_number()?;
let block_range = *self.block_ranges(tip).first().expect("has been generated before");
let filters = if let Some(phf) = self.with_filters.then_some(phf).flatten() {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let mut rng = rand::thread_rng();
let tx_range =
provider_factory.provider()?.transaction_range_by_block_range(block_range.into())?;
let mut row_indexes = tx_range.collect::<Vec<_>>();
let path: PathBuf = StaticFileSegment::Receipts
.filename_with_configuration(filters, compression, &block_range)
.into();
let provider = StaticFileProvider::read_only(PathBuf::default())?;
let jar_provider = provider.get_segment_provider_from_block(
StaticFileSegment::Receipts,
self.from,
Some(&path),
)?;
let mut cursor = jar_provider.cursor()?;
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
bench_kind,
provider_factory.clone(),
StaticFileSegment::Receipts,
filters,
compression,
|| {
for num in row_indexes.iter() {
cursor
.get_one::<ReceiptMask<Receipt>>((*num).into())?
.ok_or(ProviderError::ReceiptNotFound((*num).into()))?;
}
Ok(())
},
|provider| {
for num in row_indexes.iter() {
provider
.receipt(*num)?
.ok_or(ProviderError::ReceiptNotFound((*num).into()))?;
}
Ok(())
},
)?;
// For random walk
row_indexes.shuffle(&mut rng);
}
// BENCHMARK QUERYING A RANDOM RECEIPT BY NUMBER
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())];
bench(
BenchKind::RandomOne,
provider_factory.clone(),
StaticFileSegment::Receipts,
filters,
compression,
|| {
Ok(cursor
.get_one::<ReceiptMask<Receipt>>(num.into())?
.ok_or(ProviderError::ReceiptNotFound(num.into()))?)
},
|provider| {
Ok(provider
.receipt(num as u64)?
.ok_or(ProviderError::ReceiptNotFound((num as u64).into()))?)
},
)?;
}
// BENCHMARK QUERYING A RANDOM RECEIPT BY HASH
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64;
let tx_hash = provider_factory
.transaction_by_id(num)?
.ok_or(ProviderError::ReceiptNotFound(num.into()))?
.hash();
bench(
BenchKind::RandomHash,
provider_factory,
StaticFileSegment::Receipts,
filters,
compression,
|| {
Ok(cursor
.get_one::<ReceiptMask<Receipt>>((&tx_hash).into())?
.ok_or(ProviderError::ReceiptNotFound(tx_hash.into()))?)
},
|provider| {
Ok(provider
.receipt_by_hash(tx_hash)?
.ok_or(ProviderError::ReceiptNotFound(tx_hash.into()))?)
},
)?;
}
Ok(())
}
}

View File

@ -1,137 +0,0 @@
use super::{
bench::{bench, BenchKind},
Command, Compression, PerfectHashingFunction,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{static_file::TransactionMask, DatabaseEnv};
use reth_primitives::{
static_file::{Filters, InclusionFilter},
StaticFileSegment, TransactionSignedNoHash,
};
use reth_provider::{
providers::StaticFileProvider, BlockNumReader, ProviderError, ProviderFactory,
TransactionsProvider, TransactionsProviderExt,
};
use std::{path::PathBuf, sync::Arc};
impl Command {
pub(crate) fn bench_transactions_static_file(
&self,
provider_factory: Arc<ProviderFactory<DatabaseEnv>>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: Option<PerfectHashingFunction>,
) -> eyre::Result<()> {
let provider = provider_factory.provider()?;
let tip = provider.last_block_number()?;
let block_range = *self.block_ranges(tip).first().expect("has been generated before");
let filters = if let Some(phf) = self.with_filters.then_some(phf).flatten() {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let mut rng = rand::thread_rng();
let tx_range = provider.transaction_range_by_block_range(block_range.into())?;
let mut row_indexes = tx_range.collect::<Vec<_>>();
let path: PathBuf = StaticFileSegment::Transactions
.filename_with_configuration(filters, compression, &block_range)
.into();
let provider = StaticFileProvider::read_only(PathBuf::default())?;
let jar_provider = provider.get_segment_provider_from_block(
StaticFileSegment::Transactions,
self.from,
Some(&path),
)?;
let mut cursor = jar_provider.cursor()?;
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
bench_kind,
provider_factory.clone(),
StaticFileSegment::Transactions,
filters,
compression,
|| {
for num in row_indexes.iter() {
cursor
.get_one::<TransactionMask<TransactionSignedNoHash>>((*num).into())?
.ok_or(ProviderError::TransactionNotFound((*num).into()))?
.with_hash();
}
Ok(())
},
|provider| {
for num in row_indexes.iter() {
provider
.transaction_by_id(*num)?
.ok_or(ProviderError::TransactionNotFound((*num).into()))?;
}
Ok(())
},
)?;
// For random walk
row_indexes.shuffle(&mut rng);
}
// BENCHMARK QUERYING A RANDOM TRANSACTION BY NUMBER
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())];
bench(
BenchKind::RandomOne,
provider_factory.clone(),
StaticFileSegment::Transactions,
filters,
compression,
|| {
Ok(cursor
.get_one::<TransactionMask<TransactionSignedNoHash>>(num.into())?
.ok_or(ProviderError::TransactionNotFound(num.into()))?
.with_hash())
},
|provider| {
Ok(provider
.transaction_by_id(num as u64)?
.ok_or(ProviderError::TransactionNotFound((num as u64).into()))?)
},
)?;
}
// BENCHMARK QUERYING A RANDOM TRANSACTION BY HASH
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64;
let transaction_hash = provider_factory
.transaction_by_id(num)?
.ok_or(ProviderError::TransactionNotFound(num.into()))?
.hash();
bench(
BenchKind::RandomHash,
provider_factory,
StaticFileSegment::Transactions,
filters,
compression,
|| {
Ok(cursor
.get_one::<TransactionMask<TransactionSignedNoHash>>(
(&transaction_hash).into(),
)?
.ok_or(ProviderError::TransactionNotFound(transaction_hash.into()))?
.with_hash())
},
|provider| {
Ok(provider
.transaction_by_hash(transaction_hash)?
.ok_or(ProviderError::TransactionNotFound(transaction_hash.into()))?)
},
)?;
}
Ok(())
}
}