feat: refactor generation of snapshots from the cli (#5464)

This commit is contained in:
joshieDo
2023-11-24 18:02:14 +00:00
committed by GitHub
parent 269073b2d4
commit af88851723
15 changed files with 225 additions and 158 deletions

2
Cargo.lock generated
View File

@ -5581,6 +5581,7 @@ dependencies = [
"pretty_assertions",
"proptest",
"rand 0.8.5",
"rayon",
"reth-auto-seal-consensus",
"reth-basic-payload-builder",
"reth-beacon-consensus",
@ -6378,6 +6379,7 @@ dependencies = [
"reth-rpc-api",
"reth-rpc-types",
"serde_json",
"similar-asserts",
"tokio",
]

View File

@ -103,6 +103,7 @@ humantime = "2.1.0"
const-str = "0.5.6"
boyer-moore-magiclen = "0.2.16"
itertools.workspace = true
rayon.workspace = true
[target.'cfg(not(windows))'.dependencies]
jemallocator = { version = "0.5.0", optional = true }

View File

@ -3,56 +3,22 @@ use super::{
Command,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, snapshot::HeaderMask};
use reth_db::{open_db_read_only, snapshot::HeaderMask};
use reth_interfaces::db::LogLevel;
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
BlockHash, ChainSpec, Header, SnapshotSegment,
};
use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError,
ProviderFactory, TransactionsProviderExt,
providers::SnapshotProvider, BlockNumReader, HeaderProvider, ProviderError, ProviderFactory,
TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
impl Command {
pub(crate) fn generate_headers_snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<DB>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let range = self.block_range();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let segment = segments::Headers::new(compression, filters);
segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;
// Default name doesn't have any configuration
let tx_range = provider.transaction_range_by_block_range(range.clone())?;
reth_primitives::fs::rename(
SnapshotSegment::Headers.filename(&range, &tx_range),
SnapshotSegment::Headers.filename_with_configuration(
filters,
compression,
&range,
&tx_range,
),
)?;
Ok(())
}
pub(crate) fn bench_headers_snapshot(
&self,
db_path: &Path,
@ -62,14 +28,18 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone());
let provider = factory.provider()?;
let tip = provider.last_block_number()?;
let block_range =
self.block_ranges(tip).first().expect("has been generated before").clone();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let block_range = self.block_range();
let mut row_indexes = block_range.clone().collect::<Vec<_>>();
let mut rng = rand::thread_rng();

View File

@ -1,13 +1,21 @@
use clap::Parser;
use clap::{builder::RangedU64ValueParser, Parser};
use itertools::Itertools;
use reth_db::{open_db_read_only, DatabaseEnv};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use reth_db::{database::Database, open_db_read_only, DatabaseEnv};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Compression, InclusionFilter, PerfectHashingFunction},
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentHeader},
BlockNumber, ChainSpec, SnapshotSegment,
};
use reth_provider::ProviderFactory;
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use reth_provider::{BlockNumReader, ProviderFactory, TransactionsProviderExt};
use reth_snapshot::{segments as snap_segments, segments::Segment};
use std::{
ops::RangeInclusive,
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};
mod bench;
mod headers;
@ -28,6 +36,19 @@ pub struct Command {
#[arg(long, short, default_value = "500000")]
block_interval: u64,
/// Sets the number of snapshots built in parallel. Note: Each parallel build is
/// memory-intensive.
#[arg(
long, short,
default_value = "1",
value_parser = RangedU64ValueParser::<u64>::new().range(1..)
)]
parallel: u64,
/// Flag to skip snapshot creation and print snapshot files stats.
#[arg(long, default_value = "false")]
only_stats: bool,
/// Flag to enable database-to-snapshot benchmarking.
#[arg(long, default_value = "false")]
bench: bool,
@ -41,7 +62,7 @@ pub struct Command {
compression: Vec<Compression>,
/// Flag to enable inclusion list filters and PHFs.
#[arg(long, default_value = "true")]
#[arg(long, default_value = "false")]
with_filters: bool,
/// Specifies the perfect hashing function to use.
@ -65,39 +86,36 @@ impl Command {
{
let db = open_db_read_only(db_path, None)?;
let factory = ProviderFactory::new(db, chain.clone());
let provider = factory.provider()?;
let factory = Arc::new(ProviderFactory::new(db, chain.clone()));
if !self.only_bench {
for ((mode, compression), phf) in all_combinations.clone() {
let filters = if self.with_filters {
Filters::WithFilters(InclusionFilter::Cuckoo, *phf)
} else {
Filters::WithoutFilters
};
match mode {
SnapshotSegment::Headers => self.generate_headers_snapshot::<DatabaseEnv>(
&provider,
*compression,
InclusionFilter::Cuckoo,
*phf,
SnapshotSegment::Headers => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
snap_segments::Headers::new(*compression, filters),
)?,
SnapshotSegment::Transactions => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
snap_segments::Transactions::new(*compression, filters),
)?,
SnapshotSegment::Receipts => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
snap_segments::Receipts::new(*compression, filters),
)?,
SnapshotSegment::Transactions => self
.generate_transactions_snapshot::<DatabaseEnv>(
&provider,
*compression,
InclusionFilter::Cuckoo,
*phf,
)?,
SnapshotSegment::Receipts => self
.generate_receipts_snapshot::<DatabaseEnv>(
&provider,
*compression,
InclusionFilter::Cuckoo,
*phf,
)?,
}
}
}
}
if self.only_bench || self.bench {
for ((mode, compression), phf) in all_combinations {
for ((mode, compression), phf) in all_combinations.clone() {
match mode {
SnapshotSegment::Headers => self.bench_headers_snapshot(
db_path,
@ -130,8 +148,105 @@ impl Command {
Ok(())
}
/// Gives out the inclusive block range for the snapshot requested by the user.
fn block_range(&self) -> RangeInclusive<BlockNumber> {
self.from..=(self.from + self.block_interval - 1)
/// Generates successive inclusive block ranges up to the tip starting at `self.from`.
fn block_ranges(&self, tip: BlockNumber) -> Vec<RangeInclusive<BlockNumber>> {
let mut from = self.from;
let mut ranges = Vec::new();
while from <= tip {
let end_range = std::cmp::min(from + self.block_interval - 1, tip);
ranges.push(from..=end_range);
from = end_range + 1;
}
ranges
}
/// Generates snapshots from `self.from` with a `self.block_interval`. Generates them in
/// parallel if specified.
fn generate_snapshot<DB: Database>(
&self,
factory: Arc<ProviderFactory<DB>>,
segment: impl Segment + Send + Sync,
) -> eyre::Result<()> {
let dir = PathBuf::default();
let ranges = self.block_ranges(factory.last_block_number()?);
let mut created_snapshots = vec![];
// Filter/PHF is memory intensive, so we have to limit the parallelism.
for block_ranges in ranges.chunks(self.parallel as usize) {
let created_files = block_ranges
.into_par_iter()
.map(|block_range| {
let provider = factory.provider()?;
if !self.only_stats {
segment.snapshot::<DB>(&provider, &dir, block_range.clone())?;
}
let tx_range =
provider.transaction_range_by_block_range(block_range.clone())?;
Ok(segment.segment().filename(block_range, &tx_range))
})
.collect::<Result<Vec<_>, eyre::Report>>()?;
created_snapshots.extend(created_files);
}
self.stats(created_snapshots)
}
/// Prints detailed statistics for each snapshot, including loading time.
///
/// This function loads each snapshot from the provided paths and prints
/// statistics about various aspects of each snapshot, such as filters size,
/// offset index size, offset list size, and loading time.
fn stats(&self, snapshots: Vec<impl AsRef<Path>>) -> eyre::Result<()> {
let mb = 1024.0 * 1024.0;
let mut total_filters_size = 0;
let mut total_index_size = 0;
let mut total_offsets_size = 0;
let mut total_duration = Duration::new(0, 0);
let mut total_file_size = 0;
for snap in &snapshots {
let start_time = Instant::now();
let jar = NippyJar::<SegmentHeader>::load(snap.as_ref())?;
let duration = start_time.elapsed();
let file_size = snap.as_ref().metadata()?.len();
total_filters_size += jar.filter_size();
total_index_size += jar.offsets_index_size();
total_offsets_size += jar.offsets_size();
total_duration += duration;
total_file_size += file_size;
println!("Snapshot: {:?}", snap.as_ref().file_name());
println!(" File Size: {:>7.2} MB", file_size as f64 / mb);
println!(" Filters Size: {:>7.2} MB", jar.filter_size() as f64 / mb);
println!(" Offset Index Size: {:>7.2} MB", jar.offsets_index_size() as f64 / mb);
println!(" Offset List Size: {:>7.2} MB", jar.offsets_size() as f64 / mb);
println!(
" Loading Time: {:>7.2} ms | {:>7.2} µs",
duration.as_millis() as f64,
duration.as_micros() as f64
);
}
let avg_duration = total_duration / snapshots.len() as u32;
println!("Total Filters Size: {:>7.2} MB", total_filters_size as f64 / mb);
println!("Total Offset Index Size: {:>7.2} MB", total_index_size as f64 / mb);
println!("Total Offset List Size: {:>7.2} MB", total_offsets_size as f64 / mb);
println!("Total File Size: {:>7.2} GB", total_file_size as f64 / (mb * 1024.0));
println!(
"Average Loading Time: {:>7.2} ms | {:>7.2} µs",
avg_duration.as_millis() as f64,
avg_duration.as_micros() as f64
);
Ok(())
}
}

View File

@ -3,55 +3,23 @@ use super::{
Command, Compression, PerfectHashingFunction,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, snapshot::ReceiptMask};
use reth_db::{open_db_read_only, snapshot::ReceiptMask};
use reth_interfaces::db::LogLevel;
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
ChainSpec, Receipt, SnapshotSegment,
};
use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, ProviderError, ProviderFactory,
ReceiptProvider, TransactionsProvider, TransactionsProviderExt,
providers::SnapshotProvider, BlockNumReader, ProviderError, ProviderFactory, ReceiptProvider,
TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
impl Command {
pub(crate) fn generate_receipts_snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<DB>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let block_range = self.block_range();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let segment: segments::Receipts = segments::Receipts::new(compression, filters);
segment.snapshot::<DB>(provider, PathBuf::default(), block_range.clone())?;
// Default name doesn't have any configuration
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
reth_primitives::fs::rename(
SnapshotSegment::Receipts.filename(&block_range, &tx_range),
SnapshotSegment::Receipts.filename_with_configuration(
filters,
compression,
&block_range,
&tx_range,
),
)?;
Ok(())
}
pub(crate) fn bench_receipts_snapshot(
&self,
db_path: &Path,
@ -61,14 +29,18 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone());
let provider = factory.provider()?;
let tip = provider.last_block_number()?;
let block_range =
self.block_ranges(tip).first().expect("has been generated before").clone();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let block_range = self.block_range();
let mut rng = rand::thread_rng();
let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())

View File

@ -3,56 +3,23 @@ use super::{
Command, Compression, PerfectHashingFunction,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, snapshot::TransactionMask};
use reth_db::{open_db_read_only, snapshot::TransactionMask};
use reth_interfaces::db::LogLevel;
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
ChainSpec, SnapshotSegment, TransactionSignedNoHash,
};
use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, ProviderError, ProviderFactory,
providers::SnapshotProvider, BlockNumReader, ProviderError, ProviderFactory,
TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
impl Command {
pub(crate) fn generate_transactions_snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<DB>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let block_range = self.block_range();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let segment = segments::Transactions::new(compression, filters);
segment.snapshot::<DB>(provider, PathBuf::default(), block_range.clone())?;
// Default name doesn't have any configuration
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
reth_primitives::fs::rename(
SnapshotSegment::Transactions.filename(&block_range, &tx_range),
SnapshotSegment::Transactions.filename_with_configuration(
filters,
compression,
&block_range,
&tx_range,
),
)?;
Ok(())
}
pub(crate) fn bench_transactions_snapshot(
&self,
db_path: &Path,
@ -62,19 +29,21 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone());
let provider = factory.provider()?;
let tip = provider.last_block_number()?;
let block_range =
self.block_ranges(tip).first().expect("has been generated before").clone();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let block_range = self.block_range();
let mut rng = rand::thread_rng();
let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())
.provider()?
.transaction_range_by_block_range(block_range.clone())?;
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let mut row_indexes = tx_range.clone().collect::<Vec<_>>();

View File

@ -158,6 +158,11 @@ impl SegmentHeader {
*self.block_range.start()
}
/// Returns the last block number of the segment.
pub fn block_end(&self) -> BlockNumber {
*self.block_range.end()
}
/// Returns the first transaction number of the segment.
pub fn tx_start(&self) -> TxNumber {
*self.tx_range.start()

View File

@ -31,7 +31,7 @@ impl Default for Headers {
}
impl Segment for Headers {
fn segment() -> SnapshotSegment {
fn segment(&self) -> SnapshotSegment {
SnapshotSegment::Headers
}
@ -45,7 +45,7 @@ impl Segment for Headers {
let mut jar = prepare_jar::<DB, 3>(
provider,
directory,
Self::segment(),
self.segment(),
self.config,
range.clone(),
range_len,

View File

@ -37,7 +37,7 @@ pub trait Segment: Default {
) -> ProviderResult<()>;
/// Returns this struct's [`SnapshotSegment`].
fn segment() -> SnapshotSegment;
fn segment(&self) -> SnapshotSegment;
/// Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000).
fn dataset_for_compression<DB: Database, T: Table<Key = u64>>(

View File

@ -28,7 +28,7 @@ impl Default for Receipts {
}
impl Segment for Receipts {
fn segment() -> SnapshotSegment {
fn segment(&self) -> SnapshotSegment {
SnapshotSegment::Receipts
}
@ -44,7 +44,7 @@ impl Segment for Receipts {
let mut jar = prepare_jar::<DB, 1>(
provider,
directory,
Self::segment(),
self.segment(),
self.config,
block_range,
tx_range_len,

View File

@ -28,7 +28,7 @@ impl Default for Transactions {
}
impl Segment for Transactions {
fn segment() -> SnapshotSegment {
fn segment(&self) -> SnapshotSegment {
SnapshotSegment::Transactions
}
@ -44,7 +44,7 @@ impl Segment for Transactions {
let mut jar = prepare_jar::<DB, 1>(
provider,
directory,
Self::segment(),
self.segment(),
self.config,
block_range,
tx_range_len,

View File

@ -210,9 +210,10 @@ impl<DB: Database> Snapshotter<DB> {
let temp = self.snapshots_path.join(TEMPORARY_SUBDIRECTORY);
let provider = self.provider_factory.provider()?;
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let filename = S::segment().filename(&block_range, &tx_range);
let segment = S::default();
let filename = segment.segment().filename(&block_range, &tx_range);
S::default().snapshot::<DB>(&provider, temp.clone(), block_range)?;
segment.snapshot::<DB>(&provider, temp.clone(), block_range)?;
reth_primitives::fs::rename(temp.join(&filename), self.snapshots_path.join(filename))?;
}

View File

@ -39,6 +39,10 @@ impl InclusionFilter for Cuckoo {
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError> {
Ok(self.filter.contains(element))
}
fn size(&self) -> usize {
self.filter.memory_usage()
}
}
impl std::fmt::Debug for Cuckoo {

View File

@ -11,6 +11,8 @@ pub trait InclusionFilter {
/// Checks if the element belongs to the inclusion list. **There might be false positives.**
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError>;
fn size(&self) -> usize;
}
/// Enum with different [`InclusionFilter`] types.
@ -36,4 +38,11 @@ impl InclusionFilter for InclusionFilters {
InclusionFilters::Unused => todo!(),
}
}
fn size(&self) -> usize {
match self {
InclusionFilters::Cuckoo(c) => c.size(),
InclusionFilters::Unused => 0,
}
}
}

View File

@ -201,6 +201,21 @@ where
&self.user_header
}
/// Gets a reference to `self.offsets`.
pub fn offsets_size(&self) -> usize {
self.offsets.size_in_bytes()
}
/// Gets a reference to `self.offsets`.
pub fn filter_size(&self) -> usize {
self.size()
}
/// Gets a reference to `self.offsets_index`.
pub fn offsets_index_size(&self) -> usize {
self.offsets_index.size_in_bytes()
}
/// Gets a reference to the compressor.
pub fn compressor(&self) -> Option<&Compressors> {
self.compressor.as_ref()
@ -480,6 +495,10 @@ where
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError> {
self.filter.as_ref().ok_or(NippyJarError::FilterMissing)?.contains(element)
}
fn size(&self) -> usize {
self.filter.as_ref().map(|f| f.size()).unwrap_or(0)
}
}
impl<H> PerfectHashingFunction for NippyJar<H>