feat(bin, snapshot): SnapshotSegment, headers segment & refactors (#4979)

This commit is contained in:
Alexey Shekhirin
2023-10-12 21:51:57 +03:00
committed by GitHub
parent d3794f2c4d
commit e86b80a019
17 changed files with 396 additions and 205 deletions

3
Cargo.lock generated
View File

@ -6038,6 +6038,7 @@ dependencies = [
"byteorder",
"bytes",
"c-kzg",
"clap",
"crc",
"criterion",
"derive_more",
@ -6321,8 +6322,10 @@ name = "reth-snapshot"
version = "0.1.0-alpha.10"
dependencies = [
"assert_matches",
"clap",
"reth-db",
"reth-interfaces",
"reth-nippy-jar",
"reth-primitives",
"reth-provider",
"reth-stages",

View File

@ -19,7 +19,7 @@ normal = [
[dependencies]
# reth
reth-config = { path = "../../crates/config" }
reth-primitives = { workspace = true, features = ["arbitrary"] }
reth-primitives = { workspace = true, features = ["arbitrary", "clap"] }
reth-db = { workspace = true, features = ["mdbx", "test-utils"] }
# TODO: Temporary use of the test-utils feature
reth-provider = { workspace = true, features = ["test-utils"] }
@ -48,7 +48,7 @@ reth-payload-builder.workspace = true
reth-basic-payload-builder = { path = "../../crates/payload/basic" }
reth-discv4 = { path = "../../crates/net/discv4" }
reth-prune = { path = "../../crates/prune" }
reth-snapshot = { path = "../../crates/snapshot" }
reth-snapshot = { path = "../../crates/snapshot", features = ["clap"] }
reth-trie = { path = "../../crates/trie" }
reth-nippy-jar = { path = "../../crates/storage/nippy-jar" }

View File

@ -1,6 +1,8 @@
use super::JarConfig;
use reth_db::DatabaseEnvRO;
use reth_primitives::ChainSpec;
use reth_primitives::{
snapshot::{Compression, Filters},
ChainSpec, SnapshotSegment,
};
use reth_provider::{DatabaseProviderRO, ProviderFactory};
use std::{sync::Arc, time::Instant};
@ -15,7 +17,9 @@ pub(crate) enum BenchKind {
pub(crate) fn bench<F1, F2>(
bench_kind: BenchKind,
db: (DatabaseEnvRO, Arc<ChainSpec>),
jar_config: JarConfig,
segment: SnapshotSegment,
filters: Filters,
compression: Compression,
mut snapshot_method: F1,
database_method: F2,
) -> eyre::Result<()>
@ -23,12 +27,11 @@ where
F1: FnMut() -> eyre::Result<()>,
F2: Fn(DatabaseProviderRO<'_, DatabaseEnvRO>) -> eyre::Result<()>,
{
let (mode, compression, phf) = jar_config;
let (db, chain) = db;
println!();
println!("############");
println!("## [{mode:?}] [{compression:?}] [{phf:?}] [{bench_kind:?}]");
println!("## [{segment:?}] [{compression:?}] [{filters:?}] [{bench_kind:?}]");
{
let start = Instant::now();
snapshot_method()?;

View File

@ -1,78 +1,37 @@
use super::{
bench::{bench, BenchKind},
Command, Compression, PerfectHashingFunction, Rows, Snapshots,
Command,
};
use crate::utils::DbTool;
use rand::{seq::SliceRandom, Rng};
use reth_db::{
cursor::DbCursorRO, database::Database, open_db_read_only, snapshot::create_snapshot_T1_T2,
table::Decompress, tables, transaction::DbTx, DatabaseEnvRO,
};
use reth_db::{database::Database, open_db_read_only, table::Decompress, DatabaseEnvRO};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{BlockNumber, ChainSpec, Header};
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
ChainSpec, Header, SnapshotSegment,
};
use reth_provider::{HeaderProvider, ProviderError, ProviderFactory};
use reth_snapshot::segments::{get_snapshot_segment_file_name, Headers, Segment};
use std::{path::Path, sync::Arc};
use tables::*;
impl Command {
pub(crate) fn generate_headers_snapshot(
&self,
tool: &DbTool<'_, DatabaseEnvRO>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let mut jar = self.prepare_jar(2, (Snapshots::Headers, compression, phf), tool, || {
// Generates the dataset to train a zstd dictionary if necessary, with the most recent
// rows (at most 1000).
let dataset = tool.db.view(|tx| {
let mut cursor = tx.cursor_read::<reth_db::RawTable<reth_db::Headers>>()?;
let v1 = cursor
.walk_back(Some(RawKey::from((self.from + self.block_interval - 1) as u64)))?
.take(self.block_interval.min(1000))
.map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist"))
.collect::<Vec<_>>();
let mut cursor = tx.cursor_read::<reth_db::RawTable<reth_db::HeaderTD>>()?;
let v2 = cursor
.walk_back(Some(RawKey::from((self.from + self.block_interval - 1) as u64)))?
.take(self.block_interval.min(1000))
.map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist"))
.collect::<Vec<_>>();
Ok::<Rows, eyre::Error>(vec![v1, v2])
})??;
Ok(dataset)
})?;
tool.db.view(|tx| {
// Hacky type inference. TODO fix
let mut none_vec = Some(vec![vec![vec![0u8]].into_iter()]);
let _ = none_vec.take();
// Generate list of hashes for filters & PHF
let mut cursor = tx.cursor_read::<RawTable<CanonicalHeaders>>()?;
let mut hashes = None;
let segment = Headers::new(
compression,
if self.with_filters {
hashes = Some(
cursor
.walk(Some(RawKey::from(self.from as u64)))?
.take(self.block_interval)
.map(|row| {
row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())
}),
);
}
create_snapshot_T1_T2::<Headers, HeaderTD, BlockNumber>(
tx,
self.from as u64..=(self.from as u64 + self.block_interval as u64),
None,
// We already prepared the dictionary beforehand
none_vec,
hashes,
self.block_interval,
&mut jar,
)
})??;
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
},
);
segment.snapshot(&tool.db.tx()?, self.from..=(self.from + self.block_interval - 1))?;
Ok(())
}
@ -83,14 +42,26 @@ impl Command {
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let mode = Snapshots::Headers;
let jar_config = (mode, compression, phf);
let mut row_indexes = (self.from..(self.from + self.block_interval)).collect::<Vec<_>>();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let range = self.from..=(self.from + self.block_interval - 1);
let mut row_indexes = range.clone().collect::<Vec<_>>();
let mut rng = rand::thread_rng();
let mut dictionaries = None;
let mut jar = NippyJar::load_without_header(&self.get_file_path(jar_config))?;
let mut jar = NippyJar::load_without_header(&get_snapshot_segment_file_name(
SnapshotSegment::Headers,
filters,
compression,
&range,
))?;
let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?;
let mut cursor = if !decompressors.is_empty() {
@ -103,13 +74,15 @@ impl Command {
bench(
bench_kind,
(open_db_read_only(db_path, log_level)?, chain.clone()),
jar_config,
SnapshotSegment::Headers,
filters,
compression,
|| {
for num in row_indexes.iter() {
Header::decompress(
cursor
.row_by_number_with_cols::<0b01, 2>(num - self.from)?
.ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?[0],
.row_by_number_with_cols::<0b01, 2>((num - self.from) as usize)?
.ok_or(ProviderError::HeaderNotFound((*num).into()))?[0],
)?;
// TODO: replace with below when eventually SnapshotProvider re-uses cursor
// provider.header_by_number(num as
@ -120,8 +93,8 @@ impl Command {
|provider| {
for num in row_indexes.iter() {
provider
.header_by_number(*num as u64)?
.ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?;
.header_by_number(*num)?
.ok_or(ProviderError::HeaderNotFound((*num).into()))?;
}
Ok(())
},
@ -137,7 +110,9 @@ impl Command {
bench(
BenchKind::RandomOne,
(open_db_read_only(db_path, log_level)?, chain.clone()),
jar_config,
SnapshotSegment::Headers,
filters,
compression,
|| {
Header::decompress(
cursor
@ -167,7 +142,9 @@ impl Command {
bench(
BenchKind::RandomHash,
(open_db_read_only(db_path, log_level)?, chain.clone()),
jar_config,
SnapshotSegment::Headers,
filters,
compression,
|| {
let header = Header::decompress(
cursor
@ -176,7 +153,7 @@ impl Command {
)?;
// Might be a false positive, so in the real world we have to validate it
assert!(header.hash_slow() == header_hash);
assert_eq!(header.hash_slow(), header_hash);
Ok(())
},
|provider| {

View File

@ -1,39 +1,54 @@
use crate::utils::DbTool;
use clap::{clap_derive::ValueEnum, Parser};
use eyre::WrapErr;
use crate::{db::genesis_value_parser, utils::DbTool};
use clap::Parser;
use itertools::Itertools;
use reth_db::{database::Database, open_db_read_only, table::Table, tables, DatabaseEnvRO};
use reth_db::open_db_read_only;
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::{
compression::{DecoderDictionary, Decompressor},
NippyJar,
};
use reth_primitives::ChainSpec;
use reth_provider::providers::SnapshotProvider;
use std::{
path::{Path, PathBuf},
sync::Arc,
use reth_primitives::{
snapshot::{Compression, InclusionFilter, PerfectHashingFunction},
BlockNumber, ChainSpec, SnapshotSegment,
};
use reth_provider::providers::SnapshotProvider;
use std::{path::Path, sync::Arc};
mod bench;
mod headers;
pub(crate) type Rows = Vec<Vec<Vec<u8>>>;
pub(crate) type JarConfig = (Snapshots, Compression, PerfectHashingFunction);
#[derive(Parser, Debug)]
/// Arguments for the `reth db snapshot` command.
pub struct Command {
/// Snapshot categories to generate.
modes: Vec<Snapshots>,
/// The chain this node is running.
///
/// Possible values are either a built-in chain or the path to a chain specification file.
///
/// Built-in chains:
/// - mainnet
/// - goerli
/// - sepolia
/// - holesky
#[arg(
long,
value_name = "CHAIN_OR_PATH",
verbatim_doc_comment,
default_value = "mainnet",
value_parser = genesis_value_parser,
global = true,
)]
chain: Arc<ChainSpec>,
/// Snapshot segments to generate.
segments: Vec<SnapshotSegment>,
/// Starting block for the snapshot.
#[arg(long, short, default_value = "0")]
from: usize,
from: BlockNumber,
/// Number of blocks in the snapshot.
#[arg(long, short, default_value = "500000")]
block_interval: usize,
block_interval: u64,
/// Flag to enable database-to-snapshot benchmarking.
#[arg(long, default_value = "false")]
@ -52,7 +67,7 @@ pub struct Command {
with_filters: bool,
/// Specifies the perfect hashing function to use.
#[arg(long, value_delimiter = ',', default_value_if("with_filters", "true", "mphf"))]
#[arg(long, value_delimiter = ',', default_value_if("with_filters", "true", "fmph"))]
phf: Vec<PerfectHashingFunction>,
}
@ -65,7 +80,7 @@ impl Command {
chain: Arc<ChainSpec>,
) -> eyre::Result<()> {
let all_combinations = self
.modes
.segments
.iter()
.cartesian_product(self.compression.iter())
.cartesian_product(self.phf.iter());
@ -77,11 +92,14 @@ impl Command {
if !self.only_bench {
for ((mode, compression), phf) in all_combinations.clone() {
match mode {
Snapshots::Headers => {
self.generate_headers_snapshot(&tool, *compression, *phf)?
}
Snapshots::Transactions => todo!(),
Snapshots::Receipts => todo!(),
SnapshotSegment::Headers => self.generate_headers_snapshot(
&tool,
*compression,
InclusionFilter::Cuckoo,
*phf,
)?,
SnapshotSegment::Transactions => todo!(),
SnapshotSegment::Receipts => todo!(),
}
}
}
@ -90,15 +108,16 @@ impl Command {
if self.only_bench || self.bench {
for ((mode, compression), phf) in all_combinations {
match mode {
Snapshots::Headers => self.bench_headers_snapshot(
SnapshotSegment::Headers => self.bench_headers_snapshot(
db_path,
log_level,
chain.clone(),
*compression,
InclusionFilter::Cuckoo,
*phf,
)?,
Snapshots::Transactions => todo!(),
Snapshots::Receipts => todo!(),
SnapshotSegment::Transactions => todo!(),
SnapshotSegment::Receipts => todo!(),
}
}
}
@ -121,96 +140,6 @@ impl Command {
}
}
Ok((SnapshotProvider { jar: &*jar, jar_start_block: self.from as u64 }, decompressors))
}
/// Returns a [`NippyJar`] according to the desired configuration.
fn prepare_jar<F: Fn() -> eyre::Result<Rows>>(
&self,
num_columns: usize,
jar_config: JarConfig,
tool: &DbTool<'_, DatabaseEnvRO>,
prepare_compression: F,
) -> eyre::Result<NippyJar> {
let (mode, compression, phf) = jar_config;
let snap_file = self.get_file_path(jar_config);
let table_name = match mode {
Snapshots::Headers => tables::Headers::NAME,
Snapshots::Transactions | Snapshots::Receipts => tables::Transactions::NAME,
};
let total_rows = tool.db.view(|tx| {
let table_db = tx.inner.open_db(Some(table_name)).wrap_err("Could not open db.")?;
let stats = tx
.inner
.db_stat(&table_db)
.wrap_err(format!("Could not find table: {}", table_name))?;
Ok::<usize, eyre::Error>((stats.entries() - self.from).min(self.block_interval))
})??;
assert!(
total_rows >= self.block_interval,
"Not enough rows on database {} < {}.",
total_rows,
self.block_interval
);
let mut nippy_jar = NippyJar::new_without_header(num_columns, snap_file.as_path());
nippy_jar = match compression {
Compression::Lz4 => nippy_jar.with_lz4(),
Compression::Zstd => nippy_jar.with_zstd(false, 0),
Compression::ZstdWithDictionary => {
let dataset = prepare_compression()?;
nippy_jar = nippy_jar.with_zstd(true, 5_000_000);
nippy_jar.prepare_compression(dataset)?;
nippy_jar
}
Compression::Uncompressed => nippy_jar,
};
if self.with_filters {
nippy_jar = nippy_jar.with_cuckoo_filter(self.block_interval);
nippy_jar = match phf {
PerfectHashingFunction::Mphf => nippy_jar.with_mphf(),
PerfectHashingFunction::GoMphf => nippy_jar.with_gomphf(),
};
}
Ok(nippy_jar)
}
/// Generates a filename according to the desired configuration.
fn get_file_path(&self, jar_config: JarConfig) -> PathBuf {
let (mode, compression, phf) = jar_config;
format!(
"snapshot_{mode:?}_{}_{}_{compression:?}_{phf:?}",
self.from,
self.from + self.block_interval
)
.into()
Ok((SnapshotProvider { jar: &*jar, jar_start_block: self.from }, decompressors))
}
}
#[derive(Debug, Copy, Clone, ValueEnum)]
pub(crate) enum Snapshots {
Headers,
Transactions,
Receipts,
}
#[derive(Debug, Copy, Clone, ValueEnum, Default)]
pub(crate) enum Compression {
Lz4,
Zstd,
ZstdWithDictionary,
#[default]
Uncompressed,
}
#[derive(Debug, Copy, Clone, ValueEnum)]
pub(crate) enum PerfectHashingFunction {
Mphf,
GoMphf,
}

View File

@ -38,6 +38,7 @@ tokio-stream.workspace = true
# misc
bytes.workspace = true
byteorder = "1"
clap = { workspace = true, features = ["derive"], optional = true }
serde.workspace = true
serde_json.workspace = true
serde_with = "3.3.0"
@ -95,6 +96,7 @@ test-utils = ["dep:plain_hasher", "dep:hash-db", "dep:ethers-core"]
# value-256 controls whether transaction Value fields are DB-encoded as 256 bits instead of the
# default of 128 bits.
value-256 = ["reth-codecs/value-256"]
clap = ["dep:clap"]
[[bench]]
name = "recover_ecdsa_crit"

View File

@ -35,20 +35,19 @@ pub mod listener;
mod log;
mod net;
mod peer;
mod precaution;
pub mod proofs;
mod prune;
mod receipt;
pub mod serde_helper;
pub mod snapshot;
pub mod stage;
mod storage;
/// Helpers for working with transactions
mod transaction;
pub mod trie;
mod withdrawal;
mod precaution;
pub use account::{Account, Bytecode};
pub use block::{
Block, BlockBody, BlockBodyRoots, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
@ -83,6 +82,7 @@ pub use prune::{
};
pub use receipt::{Receipt, ReceiptWithBloom, ReceiptWithBloomRef, Receipts};
pub use serde_helper::JsonU256;
pub use snapshot::SnapshotSegment;
pub use storage::StorageEntry;
pub use transaction::{
util::secp256k1::{public_key_to_address, recover_signer, sign_message},

View File

@ -0,0 +1,11 @@
#[derive(Debug, Copy, Clone, Default)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
#[allow(missing_docs)]
/// Snapshot compression
pub enum Compression {
Lz4,
Zstd,
ZstdWithDictionary,
#[default]
Uncompressed,
}

View File

@ -0,0 +1,33 @@
#[derive(Debug, Copy, Clone)]
/// Snapshot filters.
pub enum Filters {
/// Snapshot uses filters with [InclusionFilter] and [PerfectHashingFunction].
WithFilters(InclusionFilter, PerfectHashingFunction),
/// Snapshot doesn't use any filters.
WithoutFilters,
}
impl Filters {
/// Returns `true` if snapshot uses filters.
pub const fn has_filters(&self) -> bool {
matches!(self, Self::WithFilters(_, _))
}
}
#[derive(Debug, Copy, Clone)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
/// Snapshot inclusion filter. Also see [Filters].
pub enum InclusionFilter {
/// Cuckoo filter
Cuckoo,
}
#[derive(Debug, Copy, Clone)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
/// Snapshot perfect hashing function. Also see [Filters].
pub enum PerfectHashingFunction {
/// Fingerprint-Based Minimal Perfect Hash Function
Fmph,
/// Fingerprint-Based Minimal Perfect Hash Function with Group Optimization
GoFmph,
}

View File

@ -0,0 +1,9 @@
//! Snapshot primitives.
mod compression;
mod filters;
mod segment;
pub use compression::Compression;
pub use filters::{Filters, InclusionFilter, PerfectHashingFunction};
pub use segment::SnapshotSegment;

View File

@ -0,0 +1,13 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, Serialize)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
/// Segment of the data that can be snapshotted.
pub enum SnapshotSegment {
/// Snapshot segment responsible for the `CanonicalHeaders`, `Headers`, `HeaderTD` tables.
Headers,
/// Snapshot segment responsible for the `Transactions` table.
Transactions,
/// Snapshot segment responsible for the `Receipts` table.
Receipts,
}

View File

@ -16,6 +16,7 @@ reth-primitives.workspace = true
reth-db.workspace = true
reth-provider.workspace = true
reth-interfaces.workspace = true
reth-nippy-jar = { path = "../storage/nippy-jar" }
# async
tokio = { workspace = true, features = ["sync"] }
@ -23,6 +24,7 @@ tokio = { workspace = true, features = ["sync"] }
# misc
thiserror.workspace = true
tracing.workspace = true
clap = { workspace = true, features = ["derive"], optional = true }
[dev-dependencies]
# reth
@ -32,3 +34,6 @@ reth-stages = { path = "../stages", features = ["test-utils"] }
# misc
assert_matches.workspace = true
[features]
clap = ["dep:clap"]

View File

@ -10,6 +10,7 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
mod error;
pub mod segments;
mod snapshotter;
pub use error::SnapshotterError;

View File

@ -0,0 +1,97 @@
use crate::segments::{prepare_jar, Segment};
use reth_db::{
cursor::DbCursorRO, snapshot::create_snapshot_T1_T2_T3, table::Table, tables,
transaction::DbTx, RawKey, RawTable,
};
use reth_interfaces::RethResult;
use reth_primitives::{
snapshot::{Compression, Filters},
BlockNumber, SnapshotSegment,
};
use std::ops::RangeInclusive;
/// Snapshot segment responsible for [SnapshotSegment::Headers] part of data.
#[derive(Debug)]
pub struct Headers {
compression: Compression,
filters: Filters,
}
impl Headers {
/// Creates new instance of [Headers] snapshot segment.
pub fn new(compression: Compression, filters: Filters) -> Self {
Self { compression, filters }
}
// Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000).
fn dataset_for_compression<'tx, T: Table<Key = BlockNumber>>(
&self,
tx: &impl DbTx<'tx>,
range: &RangeInclusive<BlockNumber>,
range_len: usize,
) -> RethResult<Vec<Vec<u8>>> {
let mut cursor = tx.cursor_read::<RawTable<T>>()?;
Ok(cursor
.walk_back(Some(RawKey::from(*range.end())))?
.take(range_len.min(1000))
.map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist"))
.collect::<Vec<_>>())
}
}
impl Segment for Headers {
fn snapshot<'tx>(
&self,
tx: &impl DbTx<'tx>,
range: RangeInclusive<BlockNumber>,
) -> RethResult<()> {
let range_len = range.clone().count();
let mut jar = prepare_jar::<3, tables::Headers>(
tx,
SnapshotSegment::Headers,
self.filters,
self.compression,
range.clone(),
range_len,
|| {
Ok([
self.dataset_for_compression::<tables::Headers>(tx, &range, range_len)?,
self.dataset_for_compression::<tables::HeaderTD>(tx, &range, range_len)?,
self.dataset_for_compression::<tables::CanonicalHeaders>(
tx, &range, range_len,
)?,
])
},
)?;
// Generate list of hashes for filters & PHF
let mut cursor = tx.cursor_read::<RawTable<tables::CanonicalHeaders>>()?;
let mut hashes = None;
if self.filters.has_filters() {
hashes = Some(
cursor
.walk(Some(RawKey::from(*range.start())))?
.take(range_len)
.map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())),
);
}
create_snapshot_T1_T2_T3::<
tables::Headers,
tables::HeaderTD,
tables::CanonicalHeaders,
BlockNumber,
>(
tx,
range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
range_len,
&mut jar,
)?;
Ok(())
}
}

View File

@ -0,0 +1,108 @@
//! Snapshot segment implementations and utilities.
mod headers;
pub use headers::Headers;
use reth_db::{table::Table, transaction::DbTx};
use reth_interfaces::RethResult;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
BlockNumber, SnapshotSegment,
};
use std::{ops::RangeInclusive, path::PathBuf};
pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];
/// A segment represents a snapshotting of some portion of the data.
pub trait Segment {
/// Snapshot data using the provided range.
fn snapshot<'tx>(
&self,
tx: &impl DbTx<'tx>,
range: RangeInclusive<BlockNumber>,
) -> RethResult<()>;
}
/// Returns a [`NippyJar`] according to the desired configuration.
pub(crate) fn prepare_jar<'tx, const COLUMNS: usize, T: Table>(
tx: &impl DbTx<'tx>,
segment: SnapshotSegment,
filters: Filters,
compression: Compression,
range: RangeInclusive<BlockNumber>,
range_len: usize,
prepare_compression: impl Fn() -> RethResult<Rows<COLUMNS>>,
) -> RethResult<NippyJar> {
let mut nippy_jar = NippyJar::new_without_header(
COLUMNS,
&get_snapshot_segment_file_name(segment, filters, compression, &range),
);
nippy_jar = match compression {
Compression::Lz4 => nippy_jar.with_lz4(),
Compression::Zstd => nippy_jar.with_zstd(false, 0),
Compression::ZstdWithDictionary => {
let dataset = prepare_compression()?;
nippy_jar = nippy_jar.with_zstd(true, 5_000_000);
nippy_jar.prepare_compression(dataset.to_vec())?;
nippy_jar
}
Compression::Uncompressed => nippy_jar,
};
if let Filters::WithFilters(inclusion_filter, phf) = filters {
let total_rows = (tx.entries::<T>()? - *range.start() as usize).min(range_len);
nippy_jar = match inclusion_filter {
InclusionFilter::Cuckoo => nippy_jar.with_cuckoo_filter(total_rows),
};
nippy_jar = match phf {
PerfectHashingFunction::Fmph => nippy_jar.with_fmph(),
PerfectHashingFunction::GoFmph => nippy_jar.with_gofmph(),
};
}
Ok(nippy_jar)
}
/// Returns file name for the provided segment, filters, compression and range.
pub fn get_snapshot_segment_file_name(
segment: SnapshotSegment,
filters: Filters,
compression: Compression,
range: &RangeInclusive<BlockNumber>,
) -> PathBuf {
let segment_name = match segment {
SnapshotSegment::Headers => "headers",
SnapshotSegment::Transactions => "transactions",
SnapshotSegment::Receipts => "receipts",
};
let filters_name = match filters {
Filters::WithFilters(inclusion_filter, phf) => {
let inclusion_filter = match inclusion_filter {
InclusionFilter::Cuckoo => "cuckoo",
};
let phf = match phf {
PerfectHashingFunction::Fmph => "fmph",
PerfectHashingFunction::GoFmph => "gofmph",
};
format!("{inclusion_filter}-{phf}")
}
Filters::WithoutFilters => "none".to_string(),
};
let compression_name = match compression {
Compression::Lz4 => "lz4",
Compression::Zstd => "zstd",
Compression::ZstdWithDictionary => "zstd-dict",
Compression::Uncompressed => "uncompressed",
};
format!(
"snapshot_{segment_name}_{}_{}_{filters_name}_{compression_name}",
range.start(),
range.end(),
)
.into()
}

View File

@ -182,13 +182,13 @@ where
}
/// Adds [`phf::Fmph`] perfect hashing function.
pub fn with_mphf(mut self) -> Self {
pub fn with_fmph(mut self) -> Self {
self.phf = Some(Functions::Fmph(Fmph::new()));
self
}
/// Adds [`phf::GoFmph`] perfect hashing function.
pub fn with_gomphf(mut self) -> Self {
pub fn with_gofmph(mut self) -> Self {
self.phf = Some(Functions::GoFmph(GoFmph::new()));
self
}
@ -556,12 +556,12 @@ mod tests {
assert_eq!(indexes, collect_indexes(&loaded_nippy));
};
// mphf bytes size for 100 values of 32 bytes: 54
nippy = nippy.with_mphf();
// fmph bytes size for 100 values of 32 bytes: 54
nippy = nippy.with_fmph();
check_phf(&mut nippy);
// mphf bytes size for 100 values of 32 bytes: 46
nippy = nippy.with_gomphf();
// fmph bytes size for 100 values of 32 bytes: 46
nippy = nippy.with_gofmph();
check_phf(&mut nippy);
}
@ -771,7 +771,7 @@ mod tests {
NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
.with_zstd(true, 5000)
.with_cuckoo_filter(col1.len())
.with_mphf();
.with_fmph();
nippy.prepare_compression(data.clone()).unwrap();
nippy.prepare_index(clone_with_result(&col1), col1.len()).unwrap();
@ -849,7 +849,7 @@ mod tests {
let mut nippy = NippyJar::new_without_header(num_columns, file_path.path())
.with_zstd(true, 5000)
.with_cuckoo_filter(col1.len())
.with_mphf();
.with_fmph();
nippy.prepare_compression(data.clone()).unwrap();
nippy.prepare_index(clone_with_result(&col1), col1.len()).unwrap();

View File

@ -164,7 +164,7 @@ mod test {
}
if with_filter {
nippy_jar = nippy_jar.with_cuckoo_filter(row_count as usize + 10).with_mphf();
nippy_jar = nippy_jar.with_cuckoo_filter(row_count as usize + 10).with_fmph();
}
let tx = db.tx().unwrap();