From 9c8eca6a499e4d5de6af5255694c9833d9a6cea1 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Fri, 6 Oct 2023 17:33:56 +0100 Subject: [PATCH] feat: add `reth db snapshot ` command (#4889) --- Cargo.lock | 34 ++- bin/reth/Cargo.toml | 3 + bin/reth/src/db/mod.rs | 6 + bin/reth/src/db/snapshots/bench.rs | 48 ++++ bin/reth/src/db/snapshots/headers.rs | 192 ++++++++++++++++ bin/reth/src/db/snapshots/mod.rs | 216 ++++++++++++++++++ crates/storage/db/Cargo.toml | 1 + crates/storage/db/src/snapshot.rs | 19 +- crates/storage/nippy-jar/Cargo.toml | 24 +- .../storage/nippy-jar/src/compression/lz4.rs | 83 +++++++ .../storage/nippy-jar/src/compression/mod.rs | 53 +++-- .../storage/nippy-jar/src/compression/zstd.rs | 80 ++++--- crates/storage/nippy-jar/src/cursor.rs | 98 +++++--- crates/storage/nippy-jar/src/error.rs | 4 + crates/storage/nippy-jar/src/filter/cuckoo.rs | 4 + crates/storage/nippy-jar/src/lib.rs | 208 ++++++++++++++--- crates/storage/nippy-jar/src/phf/fmph.rs | 1 - crates/storage/nippy-jar/src/phf/go_fmph.rs | 1 - .../provider/src/providers/snapshot.rs | 38 ++- 19 files changed, 954 insertions(+), 159 deletions(-) create mode 100644 bin/reth/src/db/snapshots/bench.rs create mode 100644 bin/reth/src/db/snapshots/headers.rs create mode 100644 bin/reth/src/db/snapshots/mod.rs create mode 100644 crates/storage/nippy-jar/src/compression/lz4.rs diff --git a/Cargo.lock b/Cargo.lock index 980b6f16d..4c6f123a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -823,17 +823,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "bloomfilter" -version = "1.0.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b92db7965d438b8b4b1c1d0aedd188440a1084593c9eb7f6657e3df7e906d934" -dependencies = [ - "bit-vec", - "getrandom 0.2.10", - "siphasher 1.0.0", -] - [[package]] name = "blst" version = "0.3.11" @@ -4033,6 +4022,12 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "lz4_flex" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8" + [[package]] name = "mach2" version = "0.4.1" @@ -4779,7 +4774,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" dependencies = [ - "siphasher 0.3.11", + "siphasher", ] [[package]] @@ -5421,6 +5416,7 @@ dependencies = [ "human_bytes", "humantime", "hyper", + "itertools 0.11.0", "jemalloc-ctl", "jemallocator", "metrics", @@ -5430,6 +5426,7 @@ dependencies = [ "pin-project", "pretty_assertions", "proptest", + "rand 0.8.5", "reth-auto-seal-consensus", "reth-basic-payload-builder", "reth-beacon-consensus", @@ -5444,6 +5441,7 @@ dependencies = [ "reth-net-nat", "reth-network", "reth-network-api", + "reth-nippy-jar", "reth-payload-builder", "reth-primitives", "reth-provider", @@ -5639,6 +5637,7 @@ dependencies = [ "reth-metrics", "reth-nippy-jar", "reth-primitives", + "reth-tracing", "secp256k1", "serde", "serde_json", @@ -5978,9 +5977,10 @@ version = "0.1.0-alpha.10" dependencies = [ "anyhow", "bincode", - "bloomfilter", "bytes", "cuckoofilter", + "hex", + "lz4_flex", "memmap2 0.7.1", "ph", "rand 0.8.5", @@ -5988,6 +5988,8 @@ dependencies = [ "sucds 0.8.1", "tempfile", "thiserror", + "tracing", + "tracing-appender", "zstd", ] @@ -7216,12 +7218,6 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" -[[package]] -name = "siphasher" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54ac45299ccbd390721be55b412d41931911f654fa99e2cb8bfb57184b2061fe" - [[package]] name = "sketches-ddsketch" version = "0.2.1" diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index cba9d4a89..f7be0688b 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -50,6 +50,7 @@ reth-discv4 = { path = "../../crates/net/discv4" } reth-prune = { path = "../../crates/prune" } reth-snapshot = { path = "../../crates/snapshot" } reth-trie = { path = "../../crates/trie" } +reth-nippy-jar = { path = "../../crates/storage/nippy-jar" } # crypto alloy-rlp.workspace = true @@ -76,6 +77,7 @@ metrics.workspace = true # test vectors generation proptest.workspace = true +rand.workspace = true # tui comfy-table = "7.0" @@ -102,6 +104,7 @@ pretty_assertions = "1.3.0" humantime = "2.1.0" const-str = "0.5.6" boyer-moore-magiclen = "0.2.16" +itertools.workspace = true [target.'cfg(not(windows))'.dependencies] jemallocator = { version = "0.5.0", optional = true } diff --git a/bin/reth/src/db/mod.rs b/bin/reth/src/db/mod.rs index 1c9f15bcd..4f22064f7 100644 --- a/bin/reth/src/db/mod.rs +++ b/bin/reth/src/db/mod.rs @@ -24,6 +24,7 @@ mod clear; mod diff; mod get; mod list; +mod snapshots; /// DB List TUI mod tui; @@ -85,6 +86,8 @@ pub enum Subcommands { }, /// Deletes all table entries Clear(clear::Command), + /// Snapshots tables from database + Snapshot(snapshots::Command), /// Lists current and local database versions Version, /// Returns the full database path @@ -210,6 +213,9 @@ impl Command { let db = open_db(&db_path, self.db.log_level)?; command.execute(&db)?; } + Subcommands::Snapshot(command) => { + command.execute(&db_path, self.db.log_level, self.chain.clone())?; + } Subcommands::Version => { let local_db_version = match get_db_version(&db_path) { Ok(version) => Some(version), diff --git a/bin/reth/src/db/snapshots/bench.rs b/bin/reth/src/db/snapshots/bench.rs new file mode 100644 index 000000000..8c926d226 --- /dev/null +++ b/bin/reth/src/db/snapshots/bench.rs @@ -0,0 +1,48 @@ +use super::JarConfig; +use reth_db::DatabaseEnvRO; +use reth_primitives::ChainSpec; +use reth_provider::{DatabaseProviderRO, ProviderFactory}; +use std::{sync::Arc, time::Instant}; + +#[derive(Debug)] +pub(crate) enum BenchKind { + Walk, + RandomAll, + RandomOne, + RandomHash, +} + +pub(crate) fn bench( + bench_kind: BenchKind, + db: (DatabaseEnvRO, Arc), + jar_config: JarConfig, + mut snapshot_method: F1, + database_method: F2, +) -> eyre::Result<()> +where + F1: FnMut() -> eyre::Result<()>, + F2: Fn(DatabaseProviderRO<'_, DatabaseEnvRO>) -> eyre::Result<()>, +{ + let (mode, compression, phf) = jar_config; + let (db, chain) = db; + + println!(); + println!("############"); + println!("## [{mode:?}] [{compression:?}] [{phf:?}] [{bench_kind:?}]"); + { + let start = Instant::now(); + snapshot_method()?; + let end = start.elapsed().as_micros(); + println!("# snapshot {bench_kind:?} | {end} μs"); + } + { + let factory = ProviderFactory::new(db, chain); + let provider = factory.provider()?; + let start = Instant::now(); + database_method(provider)?; + let end = start.elapsed().as_micros(); + println!("# database {bench_kind:?} | {end} μs"); + } + + Ok(()) +} diff --git a/bin/reth/src/db/snapshots/headers.rs b/bin/reth/src/db/snapshots/headers.rs new file mode 100644 index 000000000..387c1e53d --- /dev/null +++ b/bin/reth/src/db/snapshots/headers.rs @@ -0,0 +1,192 @@ +use super::{ + bench::{bench, BenchKind}, + Command, Compression, PerfectHashingFunction, Rows, Snapshots, +}; +use crate::utils::DbTool; +use rand::{seq::SliceRandom, Rng}; +use reth_db::{ + cursor::DbCursorRO, database::Database, open_db_read_only, snapshot::create_snapshot_T1_T2, + table::Decompress, tables, transaction::DbTx, DatabaseEnvRO, +}; +use reth_interfaces::db::LogLevel; +use reth_nippy_jar::NippyJar; +use reth_primitives::{BlockNumber, ChainSpec, Header}; +use reth_provider::{HeaderProvider, ProviderError, ProviderFactory}; +use std::{path::Path, sync::Arc}; +use tables::*; + +impl Command { + pub(crate) fn generate_headers_snapshot( + &self, + tool: &DbTool<'_, DatabaseEnvRO>, + compression: Compression, + phf: PerfectHashingFunction, + ) -> eyre::Result<()> { + let mut jar = self.prepare_jar(2, (Snapshots::Headers, compression, phf), tool, || { + // Generates the dataset to train a zstd dictionary if necessary, with the most recent + // rows (at most 1000). + let dataset = tool.db.view(|tx| { + let mut cursor = tx.cursor_read::>()?; + let v1 = cursor + .walk_back(Some(RawKey::from((self.from + self.block_interval - 1) as u64)))? + .take(self.block_interval.min(1000)) + .map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist")) + .collect::>(); + let mut cursor = tx.cursor_read::>()?; + let v2 = cursor + .walk_back(Some(RawKey::from((self.from + self.block_interval - 1) as u64)))? + .take(self.block_interval.min(1000)) + .map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist")) + .collect::>(); + Ok::(vec![v1, v2]) + })??; + Ok(dataset) + })?; + + tool.db.view(|tx| { + // Hacky type inference. TODO fix + let mut none_vec = Some(vec![vec![vec![0u8]].into_iter()]); + let _ = none_vec.take(); + + // Generate list of hashes for filters & PHF + let mut cursor = tx.cursor_read::>()?; + let mut hashes = None; + if self.with_filters { + hashes = Some( + cursor + .walk(Some(RawKey::from(self.from as u64)))? + .take(self.block_interval) + .map(|row| { + row.map(|(_key, value)| value.into_value()).map_err(|e| e.into()) + }), + ); + } + + create_snapshot_T1_T2::( + tx, + self.from as u64..=(self.from as u64 + self.block_interval as u64), + None, + // We already prepared the dictionary beforehand + none_vec, + hashes, + self.block_interval, + &mut jar, + ) + })??; + + Ok(()) + } + + pub(crate) fn bench_headers_snapshot( + &self, + db_path: &Path, + log_level: Option, + chain: Arc, + compression: Compression, + phf: PerfectHashingFunction, + ) -> eyre::Result<()> { + let mode = Snapshots::Headers; + let jar_config = (mode, compression, phf); + let mut row_indexes = (self.from..(self.from + self.block_interval)).collect::>(); + let mut rng = rand::thread_rng(); + let mut dictionaries = None; + let mut jar = NippyJar::load_without_header(&self.get_file_path(jar_config))?; + + let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?; + let mut cursor = if !decompressors.is_empty() { + provider.cursor_with_decompressors(decompressors) + } else { + provider.cursor() + }; + + for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] { + bench( + bench_kind, + (open_db_read_only(db_path, log_level)?, chain.clone()), + jar_config, + || { + for num in row_indexes.iter() { + Header::decompress( + cursor + .row_by_number_with_cols::<0b01, 2>(num - self.from)? + .ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?[0], + )?; + // TODO: replace with below when eventually SnapshotProvider re-uses cursor + // provider.header_by_number(num as + // u64)?.ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?; + } + Ok(()) + }, + |provider| { + for num in row_indexes.iter() { + provider + .header_by_number(*num as u64)? + .ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?; + } + Ok(()) + }, + )?; + + // For random walk + row_indexes.shuffle(&mut rng); + } + + // BENCHMARK QUERYING A RANDOM HEADER BY NUMBER + { + let num = row_indexes[rng.gen_range(0..row_indexes.len())]; + bench( + BenchKind::RandomOne, + (open_db_read_only(db_path, log_level)?, chain.clone()), + jar_config, + || { + Header::decompress( + cursor + .row_by_number_with_cols::<0b01, 2>((num - self.from) as usize)? + .ok_or(ProviderError::HeaderNotFound((num as u64).into()))?[0], + )?; + Ok(()) + }, + |provider| { + provider + .header_by_number(num as u64)? + .ok_or(ProviderError::HeaderNotFound((num as u64).into()))?; + Ok(()) + }, + )?; + } + + // BENCHMARK QUERYING A RANDOM HEADER BY HASH + { + let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64; + let header_hash = + ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone()) + .header_by_number(num)? + .ok_or(ProviderError::HeaderNotFound(num.into()))? + .hash_slow(); + + bench( + BenchKind::RandomHash, + (open_db_read_only(db_path, log_level)?, chain.clone()), + jar_config, + || { + let header = Header::decompress( + cursor + .row_by_key_with_cols::<0b01, 2>(header_hash.as_slice())? + .ok_or(ProviderError::HeaderNotFound(header_hash.into()))?[0], + )?; + + // Might be a false positive, so in the real world we have to validate it + assert!(header.hash_slow() == header_hash); + Ok(()) + }, + |provider| { + provider + .header(&header_hash)? + .ok_or(ProviderError::HeaderNotFound(header_hash.into()))?; + Ok(()) + }, + )?; + } + Ok(()) + } +} diff --git a/bin/reth/src/db/snapshots/mod.rs b/bin/reth/src/db/snapshots/mod.rs new file mode 100644 index 000000000..47b5cc4ea --- /dev/null +++ b/bin/reth/src/db/snapshots/mod.rs @@ -0,0 +1,216 @@ +use crate::utils::DbTool; +use clap::{clap_derive::ValueEnum, Parser}; +use eyre::WrapErr; +use itertools::Itertools; +use reth_db::{database::Database, open_db_read_only, table::Table, tables, DatabaseEnvRO}; +use reth_interfaces::db::LogLevel; +use reth_nippy_jar::{ + compression::{DecoderDictionary, Decompressor}, + NippyJar, +}; +use reth_primitives::ChainSpec; +use reth_provider::providers::SnapshotProvider; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; + +mod bench; +mod headers; + +pub(crate) type Rows = Vec>>; +pub(crate) type JarConfig = (Snapshots, Compression, PerfectHashingFunction); + +#[derive(Parser, Debug)] +/// Arguments for the `reth db snapshot` command. +pub struct Command { + /// Snapshot categories to generate. + modes: Vec, + + /// Starting block for the snapshot. + #[arg(long, short, default_value = "0")] + from: usize, + + /// Number of blocks in the snapshot. + #[arg(long, short, default_value = "500000")] + block_interval: usize, + + /// Flag to enable database-to-snapshot benchmarking. + #[arg(long, default_value = "false")] + bench: bool, + + /// Flag to skip snapshot creation and only run benchmarks on existing snapshots. + #[arg(long, default_value = "false")] + only_bench: bool, + + /// Compression algorithms to use. + #[arg(long, short, value_delimiter = ',', default_value = "lz4")] + compression: Vec, + + /// Flag to enable inclusion list filters and PHFs. + #[arg(long, default_value = "true")] + with_filters: bool, + + /// Specifies the perfect hashing function to use. + #[arg(long, value_delimiter = ',', default_value_if("with_filters", "true", "mphf"))] + phf: Vec, +} + +impl Command { + /// Execute `db snapshot` command + pub fn execute( + self, + db_path: &Path, + log_level: Option, + chain: Arc, + ) -> eyre::Result<()> { + let all_combinations = self + .modes + .iter() + .cartesian_product(self.compression.iter()) + .cartesian_product(self.phf.iter()); + + { + let db = open_db_read_only(db_path, None)?; + let tool = DbTool::new(&db, chain.clone())?; + + if !self.only_bench { + for ((mode, compression), phf) in all_combinations.clone() { + match mode { + Snapshots::Headers => { + self.generate_headers_snapshot(&tool, *compression, *phf)? + } + Snapshots::Transactions => todo!(), + Snapshots::Receipts => todo!(), + } + } + } + } + + if self.only_bench || self.bench { + for ((mode, compression), phf) in all_combinations { + match mode { + Snapshots::Headers => self.bench_headers_snapshot( + db_path, + log_level, + chain.clone(), + *compression, + *phf, + )?, + Snapshots::Transactions => todo!(), + Snapshots::Receipts => todo!(), + } + } + } + + Ok(()) + } + + /// Returns a [`SnapshotProvider`] of the provided [`NippyJar`], alongside a list of + /// [`DecoderDictionary`] and [`Decompressor`] if necessary. + fn prepare_jar_provider<'a>( + &self, + jar: &'a mut NippyJar, + dictionaries: &'a mut Option>>, + ) -> eyre::Result<(SnapshotProvider<'a>, Vec>)> { + let mut decompressors: Vec> = vec![]; + if let Some(reth_nippy_jar::compression::Compressors::Zstd(zstd)) = jar.compressor_mut() { + if zstd.use_dict { + *dictionaries = zstd.generate_decompress_dictionaries(); + decompressors = zstd.generate_decompressors(dictionaries.as_ref().expect("qed"))?; + } + } + + Ok((SnapshotProvider { jar: &*jar, jar_start_block: self.from as u64 }, decompressors)) + } + + /// Returns a [`NippyJar`] according to the desired configuration. + fn prepare_jar eyre::Result>( + &self, + num_columns: usize, + jar_config: JarConfig, + tool: &DbTool<'_, DatabaseEnvRO>, + prepare_compression: F, + ) -> eyre::Result { + let (mode, compression, phf) = jar_config; + let snap_file = self.get_file_path(jar_config); + let table_name = match mode { + Snapshots::Headers => tables::Headers::NAME, + Snapshots::Transactions | Snapshots::Receipts => tables::Transactions::NAME, + }; + + let total_rows = tool.db.view(|tx| { + let table_db = tx.inner.open_db(Some(table_name)).wrap_err("Could not open db.")?; + let stats = tx + .inner + .db_stat(&table_db) + .wrap_err(format!("Could not find table: {}", table_name))?; + + Ok::((stats.entries() - self.from).min(self.block_interval)) + })??; + + assert!( + total_rows >= self.block_interval, + "Not enough rows on database {} < {}.", + total_rows, + self.block_interval + ); + + let mut nippy_jar = NippyJar::new_without_header(num_columns, snap_file.as_path()); + nippy_jar = match compression { + Compression::Lz4 => nippy_jar.with_lz4(), + Compression::Zstd => nippy_jar.with_zstd(false, 0), + Compression::ZstdWithDictionary => { + let dataset = prepare_compression()?; + + nippy_jar = nippy_jar.with_zstd(true, 5_000_000); + nippy_jar.prepare_compression(dataset)?; + nippy_jar + } + Compression::Uncompressed => nippy_jar, + }; + + if self.with_filters { + nippy_jar = nippy_jar.with_cuckoo_filter(self.block_interval); + nippy_jar = match phf { + PerfectHashingFunction::Mphf => nippy_jar.with_mphf(), + PerfectHashingFunction::GoMphf => nippy_jar.with_gomphf(), + }; + } + + Ok(nippy_jar) + } + + /// Generates a filename according to the desired configuration. + fn get_file_path(&self, jar_config: JarConfig) -> PathBuf { + let (mode, compression, phf) = jar_config; + format!( + "snapshot_{mode:?}_{}_{}_{compression:?}_{phf:?}", + self.from, + self.from + self.block_interval + ) + .into() + } +} + +#[derive(Debug, Copy, Clone, ValueEnum)] +pub(crate) enum Snapshots { + Headers, + Transactions, + Receipts, +} + +#[derive(Debug, Copy, Clone, ValueEnum, Default)] +pub(crate) enum Compression { + Lz4, + Zstd, + ZstdWithDictionary, + #[default] + Uncompressed, +} + +#[derive(Debug, Copy, Clone, ValueEnum)] +pub(crate) enum PerfectHashingFunction { + Mphf, + GoMphf, +} diff --git a/crates/storage/db/Cargo.toml b/crates/storage/db/Cargo.toml index 2c895e018..069c3737c 100644 --- a/crates/storage/db/Cargo.toml +++ b/crates/storage/db/Cargo.toml @@ -15,6 +15,7 @@ reth-interfaces.workspace = true reth-codecs = { path = "../codecs" } reth-libmdbx = { path = "../libmdbx-rs", optional = true, features = ["return-borrowed"] } reth-nippy-jar = { path = "../nippy-jar" } +reth-tracing = { path = "../../tracing" } # codecs serde = { workspace = true, default-features = false } diff --git a/crates/storage/db/src/snapshot.rs b/crates/storage/db/src/snapshot.rs index cea6350d6..9736c3e1f 100644 --- a/crates/storage/db/src/snapshot.rs +++ b/crates/storage/db/src/snapshot.rs @@ -8,6 +8,7 @@ use crate::{ }; use reth_interfaces::RethResult; use reth_nippy_jar::{ColumnResult, NippyJar, PHFKey}; +use reth_tracing::tracing::*; use std::{error::Error as StdError, ops::RangeInclusive}; /// Macro that generates snapshot creation functions that take an arbitratry number of [`Table`] and @@ -25,6 +26,7 @@ macro_rules! generate_snapshot_func { /// /// * `tx`: Database transaction. /// * `range`: Data range for columns in tables. + /// * `additional`: Additional columns which can't be straight straightforwardly walked on. /// * `keys`: Iterator of keys (eg. `TxHash` or `BlockHash`) with length equal to `row_count` and ordered by future column insertion from `range`. /// * `dict_compression_set`: Sets of column data for compression dictionaries. Max size is 2GB. Row count is independent. /// * `row_count`: Total rows to add to `NippyJar`. Must match row count in `range`. @@ -37,6 +39,7 @@ macro_rules! generate_snapshot_func { ( tx: &impl DbTx<'tx>, range: RangeInclusive, + additional: Option, Box>>>>>, dict_compression_set: Option>>>, keys: Option>>, row_count: usize, @@ -44,16 +47,23 @@ macro_rules! generate_snapshot_func { ) -> RethResult<()> where K: Key + Copy { + let additional = additional.unwrap_or_default(); + debug!(target: "reth::snapshot", ?range, "Creating snapshot {:?} and {} more columns.", vec![$($tbl::NAME,)+], additional.len()); + let range: RangeInclusive> = RawKey::new(*range.start())..=RawKey::new(*range.end()); // Create PHF and Filter if required if let Some(keys) = keys { + debug!(target: "reth::snapshot", "Calculating Filter, PHF and offset index list"); nippy_jar.prepare_index(keys, row_count)?; + debug!(target: "reth::snapshot", "Filter, PHF and offset index list calculated."); } // Create compression dictionaries if required if let Some(data_sets) = dict_compression_set { + debug!(target: "reth::snapshot", "Creating compression dictionaries."); nippy_jar.prepare_compression(data_sets)?; + debug!(target: "reth::snapshot", "Compression dictionaries created."); } // Creates the cursors for the columns @@ -75,7 +85,12 @@ macro_rules! generate_snapshot_func { $(Box::new([< $tbl _iter>]),)+ ]; - nippy_jar.freeze(col_iterators, row_count as u64)?; + + debug!(target: "reth::snapshot", jar=?nippy_jar, "Generating snapshot file."); + + nippy_jar.freeze(col_iterators.into_iter().chain(additional).collect(), row_count as u64)?; + + debug!(target: "reth::snapshot", jar=?nippy_jar, "Snapshot file generated."); Ok(()) } @@ -84,4 +99,4 @@ macro_rules! generate_snapshot_func { }; } -generate_snapshot_func!((T1), (T1, T2), (T1, T2, T3), (T1, T2, T3, T4),); +generate_snapshot_func!((T1), (T1, T2), (T1, T2, T3), (T1, T2, T3, T4), (T1, T2, T3, T4, T5),); diff --git a/crates/storage/nippy-jar/Cargo.toml b/crates/storage/nippy-jar/Cargo.toml index 3aa7f92d7..9be19824a 100644 --- a/crates/storage/nippy-jar/Cargo.toml +++ b/crates/storage/nippy-jar/Cargo.toml @@ -12,20 +12,28 @@ description = "Immutable data store format" name = "reth_nippy_jar" [dependencies] -memmap2 = "0.7.1" -bloomfilter = "1" -zstd = { version = "0.12", features = ["experimental", "zdict_builder"] } + +# filter ph = "0.8.0" -thiserror .workspace = true +cuckoofilter = { version = "0.5.0", features = ["serde_support", "serde_bytes"] } + +# compression +zstd = { version = "0.12", features = ["experimental", "zdict_builder"] } +lz4_flex = { version = "0.11", default-features = false } + +# offsets +sucds = "~0.8" + +memmap2 = "0.7.1" bincode = "1.3" serde = { version = "1.0", features = ["derive"] } bytes.workspace = true -cuckoofilter = { version = "0.5.0", features = ["serde_support", "serde_bytes"] } tempfile.workspace = true -sucds = "~0.8" - +tracing = "0.1.0" +tracing-appender = "0.2" anyhow = "1.0" - +thiserror.workspace = true +hex = "*" [dev-dependencies] rand = { version = "0.8", features = ["small_rng"] } diff --git a/crates/storage/nippy-jar/src/compression/lz4.rs b/crates/storage/nippy-jar/src/compression/lz4.rs new file mode 100644 index 000000000..be014994a --- /dev/null +++ b/crates/storage/nippy-jar/src/compression/lz4.rs @@ -0,0 +1,83 @@ +use crate::{compression::Compression, NippyJarError}; +use serde::{Deserialize, Serialize}; + +/// Wrapper type for `lz4_flex` that implements [`Compression`]. +#[derive(Debug, PartialEq, Serialize, Deserialize, Default)] +#[non_exhaustive] +pub struct Lz4; + +impl Compression for Lz4 { + fn decompress_to(&self, value: &[u8], dest: &mut Vec) -> Result<(), NippyJarError> { + let previous_length = dest.len(); + + // SAFETY: We're setting len to the existing capacity. + unsafe { + dest.set_len(dest.capacity()); + } + + match lz4_flex::decompress_into(value, &mut dest[previous_length..]) { + Ok(written) => { + // SAFETY: `compress_into` can only write if there's enough capacity. Therefore, it + // shouldn't write more than our capacity. + unsafe { + dest.set_len(previous_length + written); + } + Ok(()) + } + Err(_) => { + // SAFETY: we are resetting it to the previous value. + unsafe { + dest.set_len(previous_length); + } + Err(NippyJarError::OutputTooSmall) + } + } + } + + fn decompress(&self, value: &[u8]) -> Result, NippyJarError> { + let mut multiplier = 1; + + loop { + match lz4_flex::decompress(value, multiplier * value.len()) { + Ok(v) => return Ok(v), + Err(err) => { + multiplier *= 2; + if multiplier == 16 { + return Err(NippyJarError::Custom(err.to_string())) + } + } + } + } + } + + fn compress_to(&self, src: &[u8], dest: &mut Vec) -> Result { + let previous_length = dest.len(); + + // SAFETY: We're setting len to the existing capacity. + unsafe { + dest.set_len(dest.capacity()); + } + + match lz4_flex::compress_into(src, &mut dest[previous_length..]) { + Ok(written) => { + // SAFETY: `compress_into` can only write if there's enough capacity. Therefore, it + // shouldn't write more than our capacity. + unsafe { + dest.set_len(previous_length + written); + } + Ok(written) + } + Err(_) => { + // SAFETY: we are resetting it to the previous value. + unsafe { + dest.set_len(previous_length); + } + Err(NippyJarError::OutputTooSmall) + } + } + } + + fn compress(&self, src: &[u8]) -> Result, NippyJarError> { + Ok(lz4_flex::compress(src)) + } +} diff --git a/crates/storage/nippy-jar/src/compression/mod.rs b/crates/storage/nippy-jar/src/compression/mod.rs index 5008dfdf6..a8f99fa53 100644 --- a/crates/storage/nippy-jar/src/compression/mod.rs +++ b/crates/storage/nippy-jar/src/compression/mod.rs @@ -1,17 +1,24 @@ use crate::NippyJarError; use serde::{Deserialize, Serialize}; -use std::io::Write; mod zstd; -pub use self::zstd::{Zstd, ZstdState}; +pub use self::zstd::{DecoderDictionary, Decompressor, Zstd, ZstdState}; +mod lz4; +pub use self::lz4::Lz4; /// Trait that will compress column values pub trait Compression: Serialize + for<'a> Deserialize<'a> { + /// Appends decompressed data to the dest buffer. Requires `dest` to have sufficient capacity. + fn decompress_to(&self, value: &[u8], dest: &mut Vec) -> Result<(), NippyJarError>; + /// Returns decompressed data. fn decompress(&self, value: &[u8]) -> Result, NippyJarError>; - /// Compresses data from `src` to `dest` - fn compress_to(&self, src: &[u8], dest: &mut W) -> Result<(), NippyJarError>; + /// Appends compressed data from `src` to `dest`. `dest`. Requires `dest` to have sufficient + /// capacity. + /// + /// Returns number of bytes written to `dest`. + fn compress_to(&self, src: &[u8], dest: &mut Vec) -> Result; /// Compresses data from `src` fn compress(&self, src: &[u8]) -> Result, NippyJarError>; @@ -37,36 +44,54 @@ pub trait Compression: Serialize + for<'a> Deserialize<'a> { #[cfg_attr(test, derive(PartialEq))] pub enum Compressors { Zstd(Zstd), - // Avoids irrefutable let errors. Remove this after adding another one. - Unused, + Lz4(Lz4), } impl Compression for Compressors { + fn decompress_to(&self, value: &[u8], dest: &mut Vec) -> Result<(), NippyJarError> { + match self { + Compressors::Zstd(zstd) => zstd.decompress_to(value, dest), + Compressors::Lz4(lz4) => lz4.decompress_to(value, dest), + } + } fn decompress(&self, value: &[u8]) -> Result, NippyJarError> { match self { Compressors::Zstd(zstd) => zstd.decompress(value), - Compressors::Unused => unimplemented!(), + Compressors::Lz4(lz4) => lz4.decompress(value), } } - fn compress_to(&self, src: &[u8], dest: &mut W) -> Result<(), NippyJarError> { - match self { - Compressors::Zstd(zstd) => zstd.compress_to(src, dest), - Compressors::Unused => unimplemented!(), + fn compress_to(&self, src: &[u8], dest: &mut Vec) -> Result { + let initial_capacity = dest.capacity(); + loop { + let result = match self { + Compressors::Zstd(zstd) => zstd.compress_to(src, dest), + Compressors::Lz4(lz4) => lz4.compress_to(src, dest), + }; + + match result { + Ok(v) => return Ok(v), + Err(err) => match err { + NippyJarError::OutputTooSmall => { + dest.reserve(initial_capacity); + } + _ => return Err(err), + }, + } } } fn compress(&self, src: &[u8]) -> Result, NippyJarError> { match self { Compressors::Zstd(zstd) => zstd.compress(src), - Compressors::Unused => unimplemented!(), + Compressors::Lz4(lz4) => lz4.compress(src), } } fn is_ready(&self) -> bool { match self { Compressors::Zstd(zstd) => zstd.is_ready(), - Compressors::Unused => unimplemented!(), + Compressors::Lz4(lz4) => lz4.is_ready(), } } @@ -76,7 +101,7 @@ impl Compression for Compressors { ) -> Result<(), NippyJarError> { match self { Compressors::Zstd(zstd) => zstd.prepare_compression(columns), - Compressors::Unused => Ok(()), + Compressors::Lz4(lz4) => lz4.prepare_compression(columns), } } } diff --git a/crates/storage/nippy-jar/src/compression/zstd.rs b/crates/storage/nippy-jar/src/compression/zstd.rs index 494ed6734..df1182f28 100644 --- a/crates/storage/nippy-jar/src/compression/zstd.rs +++ b/crates/storage/nippy-jar/src/compression/zstd.rs @@ -4,10 +4,9 @@ use std::{ fs::File, io::{Read, Write}, }; -use zstd::{ - bulk::{Compressor, Decompressor}, - dict::DecoderDictionary, -}; +use tracing::*; +use zstd::bulk::Compressor; +pub use zstd::{bulk::Decompressor, dict::DecoderDictionary}; type RawDictionary = Vec; @@ -26,7 +25,7 @@ pub struct Zstd { /// Compression level. A level of `0` uses zstd's default (currently `3`). pub(crate) level: i32, /// Uses custom dictionaries to compress data. - pub(crate) use_dict: bool, + pub use_dict: bool, /// Max size of a dictionary pub(crate) max_dict_size: usize, /// List of column dictionaries. @@ -87,6 +86,8 @@ impl Zstd { let mut compressors = None; if let Some(dictionaries) = &self.raw_dictionaries { + debug!(target: "nippy-jar", count=?dictionaries.len(), "Generating ZSTD compressor dictionaries."); + let mut cmp = Vec::with_capacity(dictionaries.len()); for dict in dictionaries { @@ -99,10 +100,11 @@ impl Zstd { } } - /// Compresses a value using a dictionary. + /// Compresses a value using a dictionary. Reserves additional capacity for `buffer` if + /// necessary. pub fn compress_with_dictionary( column_value: &[u8], - tmp_buf: &mut Vec, + buffer: &mut Vec, handle: &mut File, compressor: Option<&mut Compressor<'_>>, ) -> Result<(), NippyJarError> { @@ -112,16 +114,16 @@ impl Zstd { // enough, the compressed buffer will actually be larger. We keep retrying. // If we eventually fail, it probably means it's another kind of error. let mut multiplier = 1; - while let Err(err) = compressor.compress_to_buffer(column_value, tmp_buf) { - tmp_buf.reserve(column_value.len() * multiplier); + while let Err(err) = compressor.compress_to_buffer(column_value, buffer) { + buffer.reserve(column_value.len() * multiplier); multiplier += 1; if multiplier == 5 { return Err(NippyJarError::Disconnect(err)) } } - handle.write_all(tmp_buf)?; - tmp_buf.clear(); + handle.write_all(buffer)?; + buffer.clear(); } else { handle.write_all(column_value)?; } @@ -129,38 +131,46 @@ impl Zstd { Ok(()) } - /// Decompresses a value using a dictionary to a user provided buffer. + /// Appends a decompressed value using a dictionary to a user provided buffer. pub fn decompress_with_dictionary( column_value: &[u8], output: &mut Vec, decompressor: &mut Decompressor<'_>, ) -> Result<(), NippyJarError> { - let mut multiplier = 1; + let previous_length = output.len(); - // Just an estimation. - let required_capacity = column_value.len() * 2; - - output.reserve(required_capacity.saturating_sub(output.capacity())); - - // Decompressor requires the destination buffer to be big enough to write to, otherwise it - // fails. However, we don't know how big it will be. We keep retrying. - // If we eventually fail, it probably means it's another kind of error. - while let Err(err) = decompressor.decompress_to_buffer(column_value, output) { - output.reserve( - Decompressor::upper_bound(column_value).unwrap_or(required_capacity) * multiplier, - ); - - multiplier += 1; - if multiplier == 5 { - return Err(NippyJarError::Disconnect(err)) - } + // SAFETY: We're setting len to the existing capacity. + unsafe { + output.set_len(output.capacity()); } - Ok(()) + match decompressor.decompress_to_buffer(column_value, &mut output[previous_length..]) { + Ok(written) => { + // SAFETY: `decompress_to_buffer` can only write if there's enough capacity. + // Therefore, it shouldn't write more than our capacity. + unsafe { + output.set_len(previous_length + written); + } + Ok(()) + } + Err(_) => { + // SAFETY: we are resetting it to the previous value. + unsafe { + output.set_len(previous_length); + } + Err(NippyJarError::OutputTooSmall) + } + } } } impl Compression for Zstd { + fn decompress_to(&self, value: &[u8], dest: &mut Vec) -> Result<(), NippyJarError> { + let mut decoder = zstd::Decoder::with_dictionary(value, &[])?; + decoder.read_to_end(dest)?; + Ok(()) + } + fn decompress(&self, value: &[u8]) -> Result, NippyJarError> { let mut decompressed = Vec::with_capacity(value.len() * 2); let mut decoder = zstd::Decoder::new(value)?; @@ -168,13 +178,15 @@ impl Compression for Zstd { Ok(decompressed) } - fn compress_to(&self, src: &[u8], dest: &mut W) -> Result<(), NippyJarError> { + fn compress_to(&self, src: &[u8], dest: &mut Vec) -> Result { + let before = dest.len(); + let mut encoder = zstd::Encoder::new(dest, self.level)?; encoder.write_all(src)?; - encoder.finish()?; + let dest = encoder.finish()?; - Ok(()) + Ok(dest.len() - before) } fn compress(&self, src: &[u8]) -> Result, NippyJarError> { diff --git a/crates/storage/nippy-jar/src/cursor.rs b/crates/storage/nippy-jar/src/cursor.rs index 1fd7170c9..19e39fa0c 100644 --- a/crates/storage/nippy-jar/src/cursor.rs +++ b/crates/storage/nippy-jar/src/cursor.rs @@ -1,10 +1,10 @@ use crate::{ compression::{Compression, Zstd}, - InclusionFilter, NippyJar, NippyJarError, PerfectHashingFunction, Row, + InclusionFilter, NippyJar, NippyJarError, PerfectHashingFunction, RefRow, }; use memmap2::Mmap; use serde::{de::Deserialize, ser::Serialize}; -use std::{clone::Clone, fs::File}; +use std::{fs::File, ops::Range}; use sucds::int_vectors::Access; use zstd::bulk::Decompressor; @@ -19,14 +19,13 @@ pub struct NippyJarCursor<'a, H = ()> { file_handle: File, /// Data file. mmap_handle: Mmap, - /// Temporary buffer to unload data to (if necessary), without reallocating memory on each - /// retrieval. - tmp_buf: Vec, + /// Internal buffer to unload data to without reallocating memory on each retrieval. + internal_buffer: Vec, /// Cursor row position. row: u64, } -impl<'a, H> std::fmt::Debug for NippyJarCursor<'a, H> +impl<'a, H: std::fmt::Debug> std::fmt::Debug for NippyJarCursor<'a, H> where H: Send + Sync + Serialize + for<'b> Deserialize<'b> + core::fmt::Debug, { @@ -37,7 +36,7 @@ where impl<'a, H> NippyJarCursor<'a, H> where - H: Send + Sync + Serialize + for<'b> Deserialize<'b>, + H: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug, { pub fn new( jar: &'a NippyJar, @@ -53,7 +52,8 @@ where zstd_decompressors, file_handle: file, mmap_handle: mmap, - tmp_buf: vec![], + // Makes sure that we have enough buffer capacity to decompress any row of data. + internal_buffer: Vec::with_capacity(jar.max_row_size), row: 0, }) } @@ -69,7 +69,7 @@ where /// /// Example usage would be querying a transactions file with a transaction hash which is **NOT** /// stored in file. - pub fn row_by_key(&mut self, key: &[u8]) -> Result, NippyJarError> { + pub fn row_by_key(&mut self, key: &[u8]) -> Result>, NippyJarError> { if let (Some(filter), Some(phf)) = (&self.jar.filter, &self.jar.phf) { // TODO: is it worth to parallize both? @@ -93,13 +93,15 @@ where } /// Returns a row by its number. - pub fn row_by_number(&mut self, row: usize) -> Result, NippyJarError> { + pub fn row_by_number(&mut self, row: usize) -> Result>, NippyJarError> { self.row = row as u64; self.next_row() } /// Returns the current value and advances the row. - pub fn next_row(&mut self) -> Result, NippyJarError> { + pub fn next_row(&mut self) -> Result>, NippyJarError> { + self.internal_buffer.clear(); + if self.row as usize * self.jar.columns >= self.jar.offsets.len() { // Has reached the end return Ok(None) @@ -114,7 +116,14 @@ where self.row += 1; - Ok(Some(row)) + Ok(Some( + row.into_iter() + .map(|v| match v { + ValueRange::Mmap(range) => &self.mmap_handle[range], + ValueRange::Internal(range) => &self.internal_buffer[range], + }) + .collect(), + )) } /// Returns a row, searching it by a key used during [`NippyJar::prepare_index`] by using a @@ -127,7 +136,7 @@ where pub fn row_by_key_with_cols( &mut self, key: &[u8], - ) -> Result, NippyJarError> { + ) -> Result>, NippyJarError> { if let (Some(filter), Some(phf)) = (&self.jar.filter, &self.jar.phf) { // TODO: is it worth to parallize both? @@ -154,7 +163,7 @@ where pub fn row_by_number_with_cols( &mut self, row: usize, - ) -> Result, NippyJarError> { + ) -> Result>, NippyJarError> { self.row = row as u64; self.next_row_with_cols::() } @@ -164,8 +173,8 @@ where /// Uses a `MASK` to only read certain columns from the row. pub fn next_row_with_cols( &mut self, - ) -> Result, NippyJarError> { - debug_assert!(COLUMNS == self.jar.columns); + ) -> Result>, NippyJarError> { + self.internal_buffer.clear(); if self.row as usize * self.jar.columns >= self.jar.offsets.len() { // Has reached the end @@ -179,45 +188,68 @@ where self.read_value(column, &mut row)? } } - self.row += 1; - Ok(Some(row)) + Ok(Some( + row.into_iter() + .map(|v| match v { + ValueRange::Mmap(range) => &self.mmap_handle[range], + ValueRange::Internal(range) => &self.internal_buffer[range], + }) + .collect(), + )) } - /// Takes the column index and reads the value for the corresponding column. - fn read_value(&mut self, column: usize, row: &mut Row) -> Result<(), NippyJarError> { + /// Takes the column index and reads the range value for the corresponding column. + fn read_value( + &mut self, + column: usize, + row: &mut Vec, + ) -> Result<(), NippyJarError> { // Find out the offset of the column value let offset_pos = self.row as usize * self.jar.columns + column; let value_offset = self.jar.offsets.select(offset_pos).expect("should exist"); - let column_value = if self.jar.offsets.len() == (offset_pos + 1) { + let column_offset_range = if self.jar.offsets.len() == (offset_pos + 1) { // It's the last column of the last row - &self.mmap_handle[value_offset..] + value_offset..self.mmap_handle.len() } else { let next_value_offset = self.jar.offsets.select(offset_pos + 1).expect("should exist"); - &self.mmap_handle[value_offset..next_value_offset] + value_offset..next_value_offset }; if let Some(zstd_dict_decompressors) = self.zstd_decompressors.as_mut() { - self.tmp_buf.clear(); - + let from: usize = self.internal_buffer.len(); if let Some(decompressor) = zstd_dict_decompressors.get_mut(column) { - Zstd::decompress_with_dictionary(column_value, &mut self.tmp_buf, decompressor)?; + Zstd::decompress_with_dictionary( + &self.mmap_handle[column_offset_range], + &mut self.internal_buffer, + decompressor, + )?; } + let to = self.internal_buffer.len(); - debug_assert!(!self.tmp_buf.is_empty()); - - row.push(self.tmp_buf.clone()); - } else if let Some(compression) = &self.jar.compressor { + row.push(ValueRange::Internal(from..to)); + } else if let Some(compression) = self.jar.compressor() { // Uses the chosen default decompressor - row.push(compression.decompress(column_value)?); + let from = self.internal_buffer.len(); + compression + .decompress_to(&self.mmap_handle[column_offset_range], &mut self.internal_buffer)?; + let to = self.internal_buffer.len(); + + row.push(ValueRange::Internal(from..to)); } else { // Not compressed - // TODO: return Cow<&> instead of copying if there's no compression - row.push(column_value.to_vec()) + row.push(ValueRange::Mmap(column_offset_range)); } Ok(()) } } + +/// Helper type that stores the range of the decompressed column value either on a `mmap` slice or +/// on the internal buffer. +enum ValueRange { + Mmap(Range), + Internal(Range), +} diff --git a/crates/storage/nippy-jar/src/error.rs b/crates/storage/nippy-jar/src/error.rs index 1ea1b0f4d..b17d3d216 100644 --- a/crates/storage/nippy-jar/src/error.rs +++ b/crates/storage/nippy-jar/src/error.rs @@ -7,6 +7,8 @@ pub enum NippyJarError { Internal(#[from] Box), #[error(transparent)] Disconnect(#[from] std::io::Error), + #[error("{0}")] + Custom(String), #[error(transparent)] Bincode(#[from] Box), #[error(transparent)] @@ -33,4 +35,6 @@ pub enum NippyJarError { PHFMissing, #[error("NippyJar was built without an index.")] UnsupportedFilterQuery, + #[error("Compression or decompression requires a bigger destination output.")] + OutputTooSmall, } diff --git a/crates/storage/nippy-jar/src/filter/cuckoo.rs b/crates/storage/nippy-jar/src/filter/cuckoo.rs index 139af65e4..2e4110e58 100644 --- a/crates/storage/nippy-jar/src/filter/cuckoo.rs +++ b/crates/storage/nippy-jar/src/filter/cuckoo.rs @@ -17,6 +17,10 @@ pub struct Cuckoo { impl Cuckoo { pub fn new(max_capacity: usize) -> Self { + // CuckooFilter might return `NotEnoughSpace` even if they are remaining elements, if it's + // close to capacity. Therefore, we increase it. + let max_capacity = max_capacity + 100 + max_capacity / 3; + Cuckoo { remaining: max_capacity, filter: CuckooFilter::with_capacity(max_capacity) } } } diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index 1e7bcd44e..bf8d683d0 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -24,6 +24,7 @@ use sucds::{ mii_sequences::{EliasFano, EliasFanoBuilder}, Serializable, }; +use tracing::*; pub mod filter; use filter::{Cuckoo, InclusionFilter, InclusionFilters}; @@ -43,8 +44,9 @@ pub use cursor::NippyJarCursor; const NIPPY_JAR_VERSION: usize = 1; -/// A [`Row`] is a list of its selected column values. -type Row = Vec>; +/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a +/// memory-mapped file. +type RefRow<'a> = Vec<&'a [u8]>; /// Alias type for a column value wrapped in `Result` pub type ColumnResult = Result>; @@ -73,7 +75,7 @@ pub type ColumnResult = Result>; /// /// Ultimately, the `freeze` function yields two files: a data file containing both the data and its /// configuration, and an index file that houses the offsets and offsets_index. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] #[cfg_attr(test, derive(PartialEq))] pub struct NippyJar { /// The version of the NippyJar format. @@ -95,11 +97,34 @@ pub struct NippyJar { /// Offsets within the file for each column value, arranged by row and column. #[serde(skip)] offsets: EliasFano, + /// Maximum uncompressed row size of the set. This will enable decompression without any + /// resizing of the output buffer. + #[serde(skip)] + max_row_size: usize, /// Data path for file. Index file will be `{path}.idx` #[serde(skip)] path: Option, } +impl std::fmt::Debug for NippyJar { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NippyJar") + .field("version", &self.version) + .field("user_header", &self.user_header) + .field("columns", &self.columns) + .field("compressor", &self.compressor) + .field("filter", &self.filter) + .field("phf", &self.phf) + .field("offsets_index (len)", &self.offsets_index.len()) + .field("offsets_index (size in bytes)", &self.offsets_index.size_in_bytes()) + .field("offsets (len)", &self.offsets.len()) + .field("offsets (size in bytes)", &self.offsets.size_in_bytes()) + .field("path", &self.path) + .field("max_row_size", &self.max_row_size) + .finish_non_exhaustive() + } +} + impl NippyJar<()> { /// Creates a new [`NippyJar`] without an user-defined header data. pub fn new_without_header(columns: usize, path: &Path) -> Self { @@ -119,7 +144,7 @@ impl NippyJar<()> { impl NippyJar where - H: Send + Sync + Serialize + for<'a> Deserialize<'a>, + H: Send + Sync + Serialize + for<'a> Deserialize<'a> + std::fmt::Debug, { /// Creates a new [`NippyJar`] with a user-defined header data. pub fn new(columns: usize, path: &Path, user_header: H) -> Self { @@ -127,6 +152,7 @@ where version: NIPPY_JAR_VERSION, user_header, columns, + max_row_size: 0, compressor: None, filter: None, phf: None, @@ -143,6 +169,12 @@ where self } + /// Adds [`compression::Lz4`] compression. + pub fn with_lz4(mut self) -> Self { + self.compressor = Some(Compressors::Lz4(compression::Lz4::default())); + self + } + /// Adds [`filter::Cuckoo`] filter. pub fn with_cuckoo_filter(mut self, max_capacity: usize) -> Self { self.filter = Some(InclusionFilters::Cuckoo(Cuckoo::new(max_capacity))); @@ -166,6 +198,16 @@ where &self.user_header } + /// Gets a reference to the compressor. + pub fn compressor(&self) -> Option<&Compressors> { + self.compressor.as_ref() + } + + /// Gets a mutable reference to the compressor. + pub fn compressor_mut(&mut self) -> Option<&mut Compressors> { + self.compressor.as_mut() + } + /// Loads the file configuration and returns [`Self`]. /// /// **The user must ensure the header type matches the one used during the jar's creation.** @@ -185,7 +227,8 @@ where let mmap = unsafe { memmap2::Mmap::map(&offsets_file)? }; let mut offsets_reader = mmap.as_ref(); obj.offsets = EliasFano::deserialize_from(&mut offsets_reader)?; - obj.offsets_index = PrefixSummedEliasFano::deserialize_from(offsets_reader)?; + obj.offsets_index = PrefixSummedEliasFano::deserialize_from(&mut offsets_reader)?; + obj.max_row_size = bincode::deserialize_from(offsets_reader)?; Ok(obj) } @@ -211,6 +254,7 @@ where ) -> Result<(), NippyJarError> { // Makes any necessary preparations for the compressors if let Some(compression) = &mut self.compressor { + debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression."); compression.prepare_compression(columns)?; } Ok(()) @@ -226,15 +270,27 @@ where values: impl IntoIterator>, row_count: usize, ) -> Result<(), NippyJarError> { + debug!(target: "nippy-jar", ?row_count, "Preparing index."); + let values = values.into_iter().collect::, _>>()?; + + debug_assert!( + row_count == values.len(), + "Row count ({row_count}) differs from value list count ({}).", + values.len() + ); + let mut offsets_index = vec![0; row_count]; // Builds perfect hashing function from the values if let Some(phf) = self.phf.as_mut() { + debug!(target: "nippy-jar", ?row_count, values_count = ?values.len(), "Setting keys for perfect hashing function."); phf.set_keys(&values)?; } if self.filter.is_some() || self.phf.is_some() { + debug!(target: "nippy-jar", ?row_count, "Creating filter and offsets_index."); + for (row_num, v) in values.into_iter().enumerate() { if let Some(filter) = self.filter.as_mut() { filter.add(v.as_ref())?; @@ -248,6 +304,7 @@ where } } + debug!(target: "nippy-jar", ?row_count, "Encoding offsets index list."); self.offsets_index = PrefixSummedEliasFano::from_slice(&offsets_index)?; Ok(()) } @@ -271,7 +328,7 @@ where // Temporary buffer to avoid multiple reallocations if compressing to a buffer (eg. zstd w/ // dict) - let mut tmp_buf = Vec::with_capacity(100); + let mut tmp_buf = Vec::with_capacity(1_000_000); // Write all rows while taking all row start offsets let mut row_number = 0u64; @@ -279,16 +336,21 @@ where let mut column_iterators = columns.into_iter().map(|v| v.into_iter()).collect::>().into_iter(); + debug!(target: "nippy-jar", compressor=?self.compressor, "Writing rows."); + loop { let mut iterators = Vec::with_capacity(self.columns); // Write the column value of each row // TODO: iter_mut if we remove the IntoIterator interface. + let mut uncompressed_row_size = 0; for (column_number, mut column_iter) in column_iterators.enumerate() { offsets.push(file.stream_position()? as usize); match column_iter.next() { Some(Ok(value)) => { + uncompressed_row_size += value.len(); + if let Some(compression) = &self.compressor { // Special zstd case with dictionaries if let (Some(dict_compressors), Compressors::Zstd(_)) = @@ -301,7 +363,9 @@ where Some(dict_compressors.get_mut(column_number).expect("exists")), )?; } else { - compression.compress_to(&value, &mut file)?; + let before = tmp_buf.len(); + let len = compression.compress_to(&value, &mut tmp_buf)?; + file.write_all(&tmp_buf[before..before + len])?; } } else { file.write_all(&value)?; @@ -319,7 +383,10 @@ where iterators.push(column_iter); } + tmp_buf.clear(); row_number += 1; + self.max_row_size = self.max_row_size.max(uncompressed_row_size); + if row_number == total_rows { break } @@ -330,12 +397,16 @@ where // Write offsets and offset index to file self.freeze_offsets(offsets)?; + debug!(target: "nippy-jar", jar=?self, "Finished."); + Ok(()) } /// Freezes offsets and its own index. fn freeze_offsets(&mut self, offsets: Vec) -> Result<(), NippyJarError> { if !offsets.is_empty() { + debug!(target: "nippy-jar", "Encoding offsets list."); + let mut builder = EliasFanoBuilder::new(*offsets.last().expect("qed") + 1, offsets.len())?; @@ -344,9 +415,13 @@ where } self.offsets = builder.build().enable_rank(); } + + debug!(target: "nippy-jar", path=?self.index_path(), "Writing offsets and offsets index to file."); + let mut file = File::create(self.index_path())?; self.offsets.serialize_into(&mut file)?; - self.offsets_index.serialize_into(file)?; + self.offsets_index.serialize_into(&mut file)?; + self.max_row_size.serialize_into(file)?; Ok(()) } @@ -370,6 +445,8 @@ where let _ = phf.get_index(&[])?; } + debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file."); + Ok(File::create(self.data_path())?) } @@ -517,10 +594,6 @@ mod tests { // // Add more columns until max_capacity assert!(InclusionFilter::add(&mut nippy, &col1[2]).is_ok()); assert!(InclusionFilter::add(&mut nippy, &col1[3]).is_ok()); - assert!(matches!( - InclusionFilter::add(&mut nippy, &col1[4]), - Err(NippyJarError::FilterMaxCapacity) - )); nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap(); let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); @@ -542,13 +615,13 @@ mod tests { let file_path = tempfile::NamedTempFile::new().unwrap(); let nippy = NippyJar::new_without_header(num_columns, file_path.path()); - assert!(nippy.compressor.is_none()); + assert!(nippy.compressor().is_none()); let mut nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000); - assert!(nippy.compressor.is_some()); + assert!(nippy.compressor().is_some()); - if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor { + if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() { assert!(matches!(zstd.generate_compressors(), Err(NippyJarError::CompressorNotReady))); // Make sure the number of column iterators match the initial set up ones. @@ -567,7 +640,7 @@ mod tests { nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap(); - if let Some(Compressors::Zstd(zstd)) = &nippy.compressor { + if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() { assert!(matches!( (&zstd.state, zstd.raw_dictionaries.as_ref().map(|dict| dict.len())), (compression::ZstdState::Ready, Some(columns)) if columns == num_columns @@ -580,11 +653,11 @@ mod tests { assert_eq!(nippy, loaded_nippy); let mut dicts = vec![]; - if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_mut() { + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor_mut() { dicts = zstd.generate_decompress_dictionaries().unwrap() } - if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() { + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() { let mut cursor = NippyJarCursor::new( &loaded_nippy, Some(zstd.generate_decompressors(&dicts).unwrap()), @@ -594,12 +667,50 @@ mod tests { // Iterate over compressed values and compare let mut row_index = 0usize; while let Some(row) = cursor.next_row().unwrap() { - assert_eq!((&row[0], &row[1]), (&col1[row_index], &col2[row_index])); + assert_eq!( + (row[0], row[1]), + (col1[row_index].as_slice(), col2[row_index].as_slice()) + ); row_index += 1; } } } + #[test] + fn test_lz4() { + let (col1, col2) = test_data(None); + let num_rows = col1.len() as u64; + let num_columns = 2; + let file_path = tempfile::NamedTempFile::new().unwrap(); + + let nippy = NippyJar::new_without_header(num_columns, file_path.path()); + assert!(nippy.compressor().is_none()); + + let mut nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4(); + assert!(nippy.compressor().is_some()); + + nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap(); + + let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + assert_eq!(nippy, loaded_nippy); + + if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() { + let mut cursor = NippyJarCursor::new(&loaded_nippy, None).unwrap(); + + // Iterate over compressed values and compare + let mut row_index = 0usize; + while let Some(row) = cursor.next_row().unwrap() { + assert_eq!( + (row[0], row[1]), + (col1[row_index].as_slice(), col2[row_index].as_slice()) + ); + row_index += 1; + } + } else { + panic!("Expected Lz4 compressor") + } + } + #[test] fn test_zstd_no_dictionaries() { let (col1, col2) = test_data(None); @@ -608,18 +719,18 @@ mod tests { let file_path = tempfile::NamedTempFile::new().unwrap(); let nippy = NippyJar::new_without_header(num_columns, file_path.path()); - assert!(nippy.compressor.is_none()); + assert!(nippy.compressor().is_none()); let mut nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000); - assert!(nippy.compressor.is_some()); + assert!(nippy.compressor().is_some()); nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap(); let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); assert_eq!(nippy, loaded_nippy); - if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() { + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() { assert!(!zstd.use_dict); let mut cursor = NippyJarCursor::new(&loaded_nippy, None).unwrap(); @@ -627,7 +738,10 @@ mod tests { // Iterate over compressed values and compare let mut row_index = 0usize; while let Some(row) = cursor.next_row().unwrap() { - assert_eq!((&row[0], &row[1]), (&col1[row_index], &col2[row_index])); + assert_eq!( + (row[0], row[1]), + (col1[row_index].as_slice(), col2[row_index].as_slice()) + ); row_index += 1; } } else { @@ -670,16 +784,16 @@ mod tests { { let mut loaded_nippy = NippyJar::::load(file_path.path()).unwrap(); - assert!(loaded_nippy.compressor.is_some()); + assert!(loaded_nippy.compressor().is_some()); assert!(loaded_nippy.filter.is_some()); assert!(loaded_nippy.phf.is_some()); assert_eq!(loaded_nippy.user_header().block_start, block_start); let mut dicts = vec![]; - if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_mut() { + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor_mut() { dicts = zstd.generate_decompress_dictionaries().unwrap() } - if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() { + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() { let mut cursor = NippyJarCursor::new( &loaded_nippy, Some(zstd.generate_decompressors(&dicts).unwrap()), @@ -689,7 +803,10 @@ mod tests { // Iterate over compressed values and compare let mut row_num = 0usize; while let Some(row) = cursor.next_row().unwrap() { - assert_eq!((&row[0], &row[1]), (&data[0][row_num], &data[1][row_num])); + assert_eq!( + (row[0], row[1]), + (data[0][row_num].as_slice(), data[1][row_num].as_slice()) + ); row_num += 1; } @@ -700,12 +817,20 @@ mod tests { for (row_num, (v0, v1)) in data { // Simulates `by_hash` queries by iterating col1 values, which were used to // create the inner index. - let row_by_value = cursor.row_by_key(v0).unwrap().unwrap(); - assert_eq!((&row_by_value[0], &row_by_value[1]), (v0, v1)); + { + let row_by_value = cursor + .row_by_key(v0) + .unwrap() + .unwrap() + .iter() + .map(|a| a.to_vec()) + .collect::>(); + assert_eq!((&row_by_value[0], &row_by_value[1]), (v0, v1)); - // Simulates `by_number` queries - let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap(); - assert_eq!(row_by_value, row_by_num); + // Simulates `by_number` queries + let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap(); + assert_eq!(row_by_value, row_by_num); + } } } } @@ -738,10 +863,10 @@ mod tests { let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); let mut dicts = vec![]; - if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_mut() { + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor_mut() { dicts = zstd.generate_decompress_dictionaries().unwrap() } - if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() { + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() { let mut cursor = NippyJarCursor::new( &loaded_nippy, Some(zstd.generate_decompressors(&dicts).unwrap()), @@ -763,7 +888,10 @@ mod tests { let row_by_value = cursor .row_by_key_with_cols::(v0) .unwrap() - .unwrap(); + .unwrap() + .iter() + .map(|a| a.to_vec()) + .collect::>(); assert_eq!((&row_by_value[0], &row_by_value[1]), (*v0, *v1)); // Simulates `by_number` queries @@ -782,7 +910,10 @@ mod tests { let row_by_value = cursor .row_by_key_with_cols::(v0) .unwrap() - .unwrap(); + .unwrap() + .iter() + .map(|a| a.to_vec()) + .collect::>(); assert_eq!(row_by_value.len(), 1); assert_eq!(&row_by_value[0], *v0); @@ -803,7 +934,10 @@ mod tests { let row_by_value = cursor .row_by_key_with_cols::(v0) .unwrap() - .unwrap(); + .unwrap() + .iter() + .map(|a| a.to_vec()) + .collect::>(); assert_eq!(row_by_value.len(), 1); assert_eq!(&row_by_value[0], *v1); diff --git a/crates/storage/nippy-jar/src/phf/fmph.rs b/crates/storage/nippy-jar/src/phf/fmph.rs index e540bae64..8753334b5 100644 --- a/crates/storage/nippy-jar/src/phf/fmph.rs +++ b/crates/storage/nippy-jar/src/phf/fmph.rs @@ -60,7 +60,6 @@ impl PartialEq for Fmph { impl std::fmt::Debug for Fmph { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Fmph") - .field("level_sizes", &self.function.as_ref().map(|f| f.level_sizes())) .field("bytes_size", &self.function.as_ref().map(|f| f.write_bytes())) .finish_non_exhaustive() } diff --git a/crates/storage/nippy-jar/src/phf/go_fmph.rs b/crates/storage/nippy-jar/src/phf/go_fmph.rs index fd244cd1f..b2ed28f68 100644 --- a/crates/storage/nippy-jar/src/phf/go_fmph.rs +++ b/crates/storage/nippy-jar/src/phf/go_fmph.rs @@ -60,7 +60,6 @@ impl PartialEq for GoFmph { impl std::fmt::Debug for GoFmph { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("GoFmph") - .field("level_sizes", &self.function.as_ref().map(|f| f.level_sizes())) .field("bytes_size", &self.function.as_ref().map(|f| f.write_bytes())) .finish_non_exhaustive() } diff --git a/crates/storage/provider/src/providers/snapshot.rs b/crates/storage/provider/src/providers/snapshot.rs index e53ebaa93..72aed71d0 100644 --- a/crates/storage/provider/src/providers/snapshot.rs +++ b/crates/storage/provider/src/providers/snapshot.rs @@ -3,26 +3,37 @@ use reth_db::{ table::{Decompress, Table}, HeaderTD, }; -use reth_interfaces::RethResult; -use reth_nippy_jar::{NippyJar, NippyJarCursor}; +use reth_interfaces::{provider::ProviderError, RethResult}; +use reth_nippy_jar::{compression::Decompressor, NippyJar, NippyJarCursor}; use reth_primitives::{BlockHash, BlockNumber, Header, SealedHeader, U256}; use std::ops::RangeBounds; /// SnapshotProvider /// -/// WIP Rudimentary impl just for testes +/// WIP Rudimentary impl just for tests /// TODO: should be able to walk through snapshot files/block_ranges /// TODO: Arc over NippyJars and/or NippyJarCursors (LRU) #[derive(Debug)] pub struct SnapshotProvider<'a> { /// NippyJar pub jar: &'a NippyJar, + /// Starting snapshot block + pub jar_start_block: u64, } impl<'a> SnapshotProvider<'a> { - fn cursor(&self) -> NippyJarCursor<'a> { + /// Creates cursor + pub fn cursor(&self) -> NippyJarCursor<'a> { NippyJarCursor::new(self.jar, None).unwrap() } + + /// Creates cursor with zstd decompressors + pub fn cursor_with_decompressors( + &self, + decompressors: Vec>, + ) -> NippyJarCursor<'a> { + NippyJarCursor::new(self.jar, Some(decompressors)).unwrap() + } } impl<'a> HeaderProvider for SnapshotProvider<'a> { @@ -31,7 +42,7 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> { let mut cursor = self.cursor(); let header = Header::decompress( - &cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0], + cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0], ) .unwrap(); @@ -43,8 +54,14 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> { Ok(None) } - fn header_by_number(&self, _num: BlockNumber) -> RethResult> { - unimplemented!(); + fn header_by_number(&self, num: BlockNumber) -> RethResult> { + Header::decompress( + self.cursor() + .row_by_number_with_cols::<0b01, 2>((num - self.jar_start_block) as usize)? + .ok_or(ProviderError::HeaderNotFound(num.into()))?[0], + ) + .map(Some) + .map_err(Into::into) } fn header_td(&self, block_hash: &BlockHash) -> RethResult> { @@ -53,8 +70,8 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> { let row = cursor.row_by_key_with_cols::<0b11, 2>(&block_hash.0).unwrap().unwrap(); - let header = Header::decompress(&row[0]).unwrap(); - let td = ::Value::decompress(&row[1]).unwrap(); + let header = Header::decompress(row[0]).unwrap(); + let td = ::Value::decompress(row[1]).unwrap(); if &header.hash_slow() == block_hash { return Ok(Some(td.0)) @@ -166,6 +183,7 @@ mod test { create_snapshot_T1_T2::( &tx, range, + None, none_vec, Some(hashes), row_count as usize, @@ -179,7 +197,7 @@ mod test { let jar = NippyJar::load_without_header(snap_file.path()).unwrap(); let db_provider = factory.provider().unwrap(); - let snap_provider = SnapshotProvider { jar: &jar }; + let snap_provider = SnapshotProvider { jar: &jar, jar_start_block: 0 }; assert!(!headers.is_empty());