feat: add reth db snapshot <TYPE> command (#4889)

This commit is contained in:
joshieDo
2023-10-06 17:33:56 +01:00
committed by GitHub
parent 529635f8d4
commit 9c8eca6a49
19 changed files with 954 additions and 159 deletions

34
Cargo.lock generated
View File

@ -823,17 +823,6 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bloomfilter"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b92db7965d438b8b4b1c1d0aedd188440a1084593c9eb7f6657e3df7e906d934"
dependencies = [
"bit-vec",
"getrandom 0.2.10",
"siphasher 1.0.0",
]
[[package]]
name = "blst"
version = "0.3.11"
@ -4033,6 +4022,12 @@ dependencies = [
"linked-hash-map",
]
[[package]]
name = "lz4_flex"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8"
[[package]]
name = "mach2"
version = "0.4.1"
@ -4779,7 +4774,7 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b"
dependencies = [
"siphasher 0.3.11",
"siphasher",
]
[[package]]
@ -5421,6 +5416,7 @@ dependencies = [
"human_bytes",
"humantime",
"hyper",
"itertools 0.11.0",
"jemalloc-ctl",
"jemallocator",
"metrics",
@ -5430,6 +5426,7 @@ dependencies = [
"pin-project",
"pretty_assertions",
"proptest",
"rand 0.8.5",
"reth-auto-seal-consensus",
"reth-basic-payload-builder",
"reth-beacon-consensus",
@ -5444,6 +5441,7 @@ dependencies = [
"reth-net-nat",
"reth-network",
"reth-network-api",
"reth-nippy-jar",
"reth-payload-builder",
"reth-primitives",
"reth-provider",
@ -5639,6 +5637,7 @@ dependencies = [
"reth-metrics",
"reth-nippy-jar",
"reth-primitives",
"reth-tracing",
"secp256k1",
"serde",
"serde_json",
@ -5978,9 +5977,10 @@ version = "0.1.0-alpha.10"
dependencies = [
"anyhow",
"bincode",
"bloomfilter",
"bytes",
"cuckoofilter",
"hex",
"lz4_flex",
"memmap2 0.7.1",
"ph",
"rand 0.8.5",
@ -5988,6 +5988,8 @@ dependencies = [
"sucds 0.8.1",
"tempfile",
"thiserror",
"tracing",
"tracing-appender",
"zstd",
]
@ -7216,12 +7218,6 @@ version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d"
[[package]]
name = "siphasher"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54ac45299ccbd390721be55b412d41931911f654fa99e2cb8bfb57184b2061fe"
[[package]]
name = "sketches-ddsketch"
version = "0.2.1"

View File

@ -50,6 +50,7 @@ reth-discv4 = { path = "../../crates/net/discv4" }
reth-prune = { path = "../../crates/prune" }
reth-snapshot = { path = "../../crates/snapshot" }
reth-trie = { path = "../../crates/trie" }
reth-nippy-jar = { path = "../../crates/storage/nippy-jar" }
# crypto
alloy-rlp.workspace = true
@ -76,6 +77,7 @@ metrics.workspace = true
# test vectors generation
proptest.workspace = true
rand.workspace = true
# tui
comfy-table = "7.0"
@ -102,6 +104,7 @@ pretty_assertions = "1.3.0"
humantime = "2.1.0"
const-str = "0.5.6"
boyer-moore-magiclen = "0.2.16"
itertools.workspace = true
[target.'cfg(not(windows))'.dependencies]
jemallocator = { version = "0.5.0", optional = true }

View File

@ -24,6 +24,7 @@ mod clear;
mod diff;
mod get;
mod list;
mod snapshots;
/// DB List TUI
mod tui;
@ -85,6 +86,8 @@ pub enum Subcommands {
},
/// Deletes all table entries
Clear(clear::Command),
/// Snapshots tables from database
Snapshot(snapshots::Command),
/// Lists current and local database versions
Version,
/// Returns the full database path
@ -210,6 +213,9 @@ impl Command {
let db = open_db(&db_path, self.db.log_level)?;
command.execute(&db)?;
}
Subcommands::Snapshot(command) => {
command.execute(&db_path, self.db.log_level, self.chain.clone())?;
}
Subcommands::Version => {
let local_db_version = match get_db_version(&db_path) {
Ok(version) => Some(version),

View File

@ -0,0 +1,48 @@
use super::JarConfig;
use reth_db::DatabaseEnvRO;
use reth_primitives::ChainSpec;
use reth_provider::{DatabaseProviderRO, ProviderFactory};
use std::{sync::Arc, time::Instant};
#[derive(Debug)]
pub(crate) enum BenchKind {
Walk,
RandomAll,
RandomOne,
RandomHash,
}
pub(crate) fn bench<F1, F2>(
bench_kind: BenchKind,
db: (DatabaseEnvRO, Arc<ChainSpec>),
jar_config: JarConfig,
mut snapshot_method: F1,
database_method: F2,
) -> eyre::Result<()>
where
F1: FnMut() -> eyre::Result<()>,
F2: Fn(DatabaseProviderRO<'_, DatabaseEnvRO>) -> eyre::Result<()>,
{
let (mode, compression, phf) = jar_config;
let (db, chain) = db;
println!();
println!("############");
println!("## [{mode:?}] [{compression:?}] [{phf:?}] [{bench_kind:?}]");
{
let start = Instant::now();
snapshot_method()?;
let end = start.elapsed().as_micros();
println!("# snapshot {bench_kind:?} | {end} μs");
}
{
let factory = ProviderFactory::new(db, chain);
let provider = factory.provider()?;
let start = Instant::now();
database_method(provider)?;
let end = start.elapsed().as_micros();
println!("# database {bench_kind:?} | {end} μs");
}
Ok(())
}

View File

@ -0,0 +1,192 @@
use super::{
bench::{bench, BenchKind},
Command, Compression, PerfectHashingFunction, Rows, Snapshots,
};
use crate::utils::DbTool;
use rand::{seq::SliceRandom, Rng};
use reth_db::{
cursor::DbCursorRO, database::Database, open_db_read_only, snapshot::create_snapshot_T1_T2,
table::Decompress, tables, transaction::DbTx, DatabaseEnvRO,
};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{BlockNumber, ChainSpec, Header};
use reth_provider::{HeaderProvider, ProviderError, ProviderFactory};
use std::{path::Path, sync::Arc};
use tables::*;
impl Command {
pub(crate) fn generate_headers_snapshot(
&self,
tool: &DbTool<'_, DatabaseEnvRO>,
compression: Compression,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let mut jar = self.prepare_jar(2, (Snapshots::Headers, compression, phf), tool, || {
// Generates the dataset to train a zstd dictionary if necessary, with the most recent
// rows (at most 1000).
let dataset = tool.db.view(|tx| {
let mut cursor = tx.cursor_read::<reth_db::RawTable<reth_db::Headers>>()?;
let v1 = cursor
.walk_back(Some(RawKey::from((self.from + self.block_interval - 1) as u64)))?
.take(self.block_interval.min(1000))
.map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist"))
.collect::<Vec<_>>();
let mut cursor = tx.cursor_read::<reth_db::RawTable<reth_db::HeaderTD>>()?;
let v2 = cursor
.walk_back(Some(RawKey::from((self.from + self.block_interval - 1) as u64)))?
.take(self.block_interval.min(1000))
.map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist"))
.collect::<Vec<_>>();
Ok::<Rows, eyre::Error>(vec![v1, v2])
})??;
Ok(dataset)
})?;
tool.db.view(|tx| {
// Hacky type inference. TODO fix
let mut none_vec = Some(vec![vec![vec![0u8]].into_iter()]);
let _ = none_vec.take();
// Generate list of hashes for filters & PHF
let mut cursor = tx.cursor_read::<RawTable<CanonicalHeaders>>()?;
let mut hashes = None;
if self.with_filters {
hashes = Some(
cursor
.walk(Some(RawKey::from(self.from as u64)))?
.take(self.block_interval)
.map(|row| {
row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())
}),
);
}
create_snapshot_T1_T2::<Headers, HeaderTD, BlockNumber>(
tx,
self.from as u64..=(self.from as u64 + self.block_interval as u64),
None,
// We already prepared the dictionary beforehand
none_vec,
hashes,
self.block_interval,
&mut jar,
)
})??;
Ok(())
}
pub(crate) fn bench_headers_snapshot(
&self,
db_path: &Path,
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
compression: Compression,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let mode = Snapshots::Headers;
let jar_config = (mode, compression, phf);
let mut row_indexes = (self.from..(self.from + self.block_interval)).collect::<Vec<_>>();
let mut rng = rand::thread_rng();
let mut dictionaries = None;
let mut jar = NippyJar::load_without_header(&self.get_file_path(jar_config))?;
let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?;
let mut cursor = if !decompressors.is_empty() {
provider.cursor_with_decompressors(decompressors)
} else {
provider.cursor()
};
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
bench_kind,
(open_db_read_only(db_path, log_level)?, chain.clone()),
jar_config,
|| {
for num in row_indexes.iter() {
Header::decompress(
cursor
.row_by_number_with_cols::<0b01, 2>(num - self.from)?
.ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?[0],
)?;
// TODO: replace with below when eventually SnapshotProvider re-uses cursor
// provider.header_by_number(num as
// u64)?.ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?;
}
Ok(())
},
|provider| {
for num in row_indexes.iter() {
provider
.header_by_number(*num as u64)?
.ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?;
}
Ok(())
},
)?;
// For random walk
row_indexes.shuffle(&mut rng);
}
// BENCHMARK QUERYING A RANDOM HEADER BY NUMBER
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())];
bench(
BenchKind::RandomOne,
(open_db_read_only(db_path, log_level)?, chain.clone()),
jar_config,
|| {
Header::decompress(
cursor
.row_by_number_with_cols::<0b01, 2>((num - self.from) as usize)?
.ok_or(ProviderError::HeaderNotFound((num as u64).into()))?[0],
)?;
Ok(())
},
|provider| {
provider
.header_by_number(num as u64)?
.ok_or(ProviderError::HeaderNotFound((num as u64).into()))?;
Ok(())
},
)?;
}
// BENCHMARK QUERYING A RANDOM HEADER BY HASH
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64;
let header_hash =
ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())
.header_by_number(num)?
.ok_or(ProviderError::HeaderNotFound(num.into()))?
.hash_slow();
bench(
BenchKind::RandomHash,
(open_db_read_only(db_path, log_level)?, chain.clone()),
jar_config,
|| {
let header = Header::decompress(
cursor
.row_by_key_with_cols::<0b01, 2>(header_hash.as_slice())?
.ok_or(ProviderError::HeaderNotFound(header_hash.into()))?[0],
)?;
// Might be a false positive, so in the real world we have to validate it
assert!(header.hash_slow() == header_hash);
Ok(())
},
|provider| {
provider
.header(&header_hash)?
.ok_or(ProviderError::HeaderNotFound(header_hash.into()))?;
Ok(())
},
)?;
}
Ok(())
}
}

View File

@ -0,0 +1,216 @@
use crate::utils::DbTool;
use clap::{clap_derive::ValueEnum, Parser};
use eyre::WrapErr;
use itertools::Itertools;
use reth_db::{database::Database, open_db_read_only, table::Table, tables, DatabaseEnvRO};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::{
compression::{DecoderDictionary, Decompressor},
NippyJar,
};
use reth_primitives::ChainSpec;
use reth_provider::providers::SnapshotProvider;
use std::{
path::{Path, PathBuf},
sync::Arc,
};
mod bench;
mod headers;
pub(crate) type Rows = Vec<Vec<Vec<u8>>>;
pub(crate) type JarConfig = (Snapshots, Compression, PerfectHashingFunction);
#[derive(Parser, Debug)]
/// Arguments for the `reth db snapshot` command.
pub struct Command {
/// Snapshot categories to generate.
modes: Vec<Snapshots>,
/// Starting block for the snapshot.
#[arg(long, short, default_value = "0")]
from: usize,
/// Number of blocks in the snapshot.
#[arg(long, short, default_value = "500000")]
block_interval: usize,
/// Flag to enable database-to-snapshot benchmarking.
#[arg(long, default_value = "false")]
bench: bool,
/// Flag to skip snapshot creation and only run benchmarks on existing snapshots.
#[arg(long, default_value = "false")]
only_bench: bool,
/// Compression algorithms to use.
#[arg(long, short, value_delimiter = ',', default_value = "lz4")]
compression: Vec<Compression>,
/// Flag to enable inclusion list filters and PHFs.
#[arg(long, default_value = "true")]
with_filters: bool,
/// Specifies the perfect hashing function to use.
#[arg(long, value_delimiter = ',', default_value_if("with_filters", "true", "mphf"))]
phf: Vec<PerfectHashingFunction>,
}
impl Command {
/// Execute `db snapshot` command
pub fn execute(
self,
db_path: &Path,
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
) -> eyre::Result<()> {
let all_combinations = self
.modes
.iter()
.cartesian_product(self.compression.iter())
.cartesian_product(self.phf.iter());
{
let db = open_db_read_only(db_path, None)?;
let tool = DbTool::new(&db, chain.clone())?;
if !self.only_bench {
for ((mode, compression), phf) in all_combinations.clone() {
match mode {
Snapshots::Headers => {
self.generate_headers_snapshot(&tool, *compression, *phf)?
}
Snapshots::Transactions => todo!(),
Snapshots::Receipts => todo!(),
}
}
}
}
if self.only_bench || self.bench {
for ((mode, compression), phf) in all_combinations {
match mode {
Snapshots::Headers => self.bench_headers_snapshot(
db_path,
log_level,
chain.clone(),
*compression,
*phf,
)?,
Snapshots::Transactions => todo!(),
Snapshots::Receipts => todo!(),
}
}
}
Ok(())
}
/// Returns a [`SnapshotProvider`] of the provided [`NippyJar`], alongside a list of
/// [`DecoderDictionary`] and [`Decompressor`] if necessary.
fn prepare_jar_provider<'a>(
&self,
jar: &'a mut NippyJar,
dictionaries: &'a mut Option<Vec<DecoderDictionary<'_>>>,
) -> eyre::Result<(SnapshotProvider<'a>, Vec<Decompressor<'a>>)> {
let mut decompressors: Vec<Decompressor<'_>> = vec![];
if let Some(reth_nippy_jar::compression::Compressors::Zstd(zstd)) = jar.compressor_mut() {
if zstd.use_dict {
*dictionaries = zstd.generate_decompress_dictionaries();
decompressors = zstd.generate_decompressors(dictionaries.as_ref().expect("qed"))?;
}
}
Ok((SnapshotProvider { jar: &*jar, jar_start_block: self.from as u64 }, decompressors))
}
/// Returns a [`NippyJar`] according to the desired configuration.
fn prepare_jar<F: Fn() -> eyre::Result<Rows>>(
&self,
num_columns: usize,
jar_config: JarConfig,
tool: &DbTool<'_, DatabaseEnvRO>,
prepare_compression: F,
) -> eyre::Result<NippyJar> {
let (mode, compression, phf) = jar_config;
let snap_file = self.get_file_path(jar_config);
let table_name = match mode {
Snapshots::Headers => tables::Headers::NAME,
Snapshots::Transactions | Snapshots::Receipts => tables::Transactions::NAME,
};
let total_rows = tool.db.view(|tx| {
let table_db = tx.inner.open_db(Some(table_name)).wrap_err("Could not open db.")?;
let stats = tx
.inner
.db_stat(&table_db)
.wrap_err(format!("Could not find table: {}", table_name))?;
Ok::<usize, eyre::Error>((stats.entries() - self.from).min(self.block_interval))
})??;
assert!(
total_rows >= self.block_interval,
"Not enough rows on database {} < {}.",
total_rows,
self.block_interval
);
let mut nippy_jar = NippyJar::new_without_header(num_columns, snap_file.as_path());
nippy_jar = match compression {
Compression::Lz4 => nippy_jar.with_lz4(),
Compression::Zstd => nippy_jar.with_zstd(false, 0),
Compression::ZstdWithDictionary => {
let dataset = prepare_compression()?;
nippy_jar = nippy_jar.with_zstd(true, 5_000_000);
nippy_jar.prepare_compression(dataset)?;
nippy_jar
}
Compression::Uncompressed => nippy_jar,
};
if self.with_filters {
nippy_jar = nippy_jar.with_cuckoo_filter(self.block_interval);
nippy_jar = match phf {
PerfectHashingFunction::Mphf => nippy_jar.with_mphf(),
PerfectHashingFunction::GoMphf => nippy_jar.with_gomphf(),
};
}
Ok(nippy_jar)
}
/// Generates a filename according to the desired configuration.
fn get_file_path(&self, jar_config: JarConfig) -> PathBuf {
let (mode, compression, phf) = jar_config;
format!(
"snapshot_{mode:?}_{}_{}_{compression:?}_{phf:?}",
self.from,
self.from + self.block_interval
)
.into()
}
}
#[derive(Debug, Copy, Clone, ValueEnum)]
pub(crate) enum Snapshots {
Headers,
Transactions,
Receipts,
}
#[derive(Debug, Copy, Clone, ValueEnum, Default)]
pub(crate) enum Compression {
Lz4,
Zstd,
ZstdWithDictionary,
#[default]
Uncompressed,
}
#[derive(Debug, Copy, Clone, ValueEnum)]
pub(crate) enum PerfectHashingFunction {
Mphf,
GoMphf,
}

View File

@ -15,6 +15,7 @@ reth-interfaces.workspace = true
reth-codecs = { path = "../codecs" }
reth-libmdbx = { path = "../libmdbx-rs", optional = true, features = ["return-borrowed"] }
reth-nippy-jar = { path = "../nippy-jar" }
reth-tracing = { path = "../../tracing" }
# codecs
serde = { workspace = true, default-features = false }

View File

@ -8,6 +8,7 @@ use crate::{
};
use reth_interfaces::RethResult;
use reth_nippy_jar::{ColumnResult, NippyJar, PHFKey};
use reth_tracing::tracing::*;
use std::{error::Error as StdError, ops::RangeInclusive};
/// Macro that generates snapshot creation functions that take an arbitratry number of [`Table`] and
@ -25,6 +26,7 @@ macro_rules! generate_snapshot_func {
///
/// * `tx`: Database transaction.
/// * `range`: Data range for columns in tables.
/// * `additional`: Additional columns which can't be straight straightforwardly walked on.
/// * `keys`: Iterator of keys (eg. `TxHash` or `BlockHash`) with length equal to `row_count` and ordered by future column insertion from `range`.
/// * `dict_compression_set`: Sets of column data for compression dictionaries. Max size is 2GB. Row count is independent.
/// * `row_count`: Total rows to add to `NippyJar`. Must match row count in `range`.
@ -37,6 +39,7 @@ macro_rules! generate_snapshot_func {
(
tx: &impl DbTx<'tx>,
range: RangeInclusive<K>,
additional: Option<Vec<Box<dyn Iterator<Item = Result<Vec<u8>, Box<dyn StdError + Send + Sync>>>>>>,
dict_compression_set: Option<Vec<impl Iterator<Item = Vec<u8>>>>,
keys: Option<impl Iterator<Item = ColumnResult<impl PHFKey>>>,
row_count: usize,
@ -44,16 +47,23 @@ macro_rules! generate_snapshot_func {
) -> RethResult<()>
where K: Key + Copy
{
let additional = additional.unwrap_or_default();
debug!(target: "reth::snapshot", ?range, "Creating snapshot {:?} and {} more columns.", vec![$($tbl::NAME,)+], additional.len());
let range: RangeInclusive<RawKey<K>> = RawKey::new(*range.start())..=RawKey::new(*range.end());
// Create PHF and Filter if required
if let Some(keys) = keys {
debug!(target: "reth::snapshot", "Calculating Filter, PHF and offset index list");
nippy_jar.prepare_index(keys, row_count)?;
debug!(target: "reth::snapshot", "Filter, PHF and offset index list calculated.");
}
// Create compression dictionaries if required
if let Some(data_sets) = dict_compression_set {
debug!(target: "reth::snapshot", "Creating compression dictionaries.");
nippy_jar.prepare_compression(data_sets)?;
debug!(target: "reth::snapshot", "Compression dictionaries created.");
}
// Creates the cursors for the columns
@ -75,7 +85,12 @@ macro_rules! generate_snapshot_func {
$(Box::new([< $tbl _iter>]),)+
];
nippy_jar.freeze(col_iterators, row_count as u64)?;
debug!(target: "reth::snapshot", jar=?nippy_jar, "Generating snapshot file.");
nippy_jar.freeze(col_iterators.into_iter().chain(additional).collect(), row_count as u64)?;
debug!(target: "reth::snapshot", jar=?nippy_jar, "Snapshot file generated.");
Ok(())
}
@ -84,4 +99,4 @@ macro_rules! generate_snapshot_func {
};
}
generate_snapshot_func!((T1), (T1, T2), (T1, T2, T3), (T1, T2, T3, T4),);
generate_snapshot_func!((T1), (T1, T2), (T1, T2, T3), (T1, T2, T3, T4), (T1, T2, T3, T4, T5),);

View File

@ -12,20 +12,28 @@ description = "Immutable data store format"
name = "reth_nippy_jar"
[dependencies]
memmap2 = "0.7.1"
bloomfilter = "1"
zstd = { version = "0.12", features = ["experimental", "zdict_builder"] }
# filter
ph = "0.8.0"
thiserror .workspace = true
cuckoofilter = { version = "0.5.0", features = ["serde_support", "serde_bytes"] }
# compression
zstd = { version = "0.12", features = ["experimental", "zdict_builder"] }
lz4_flex = { version = "0.11", default-features = false }
# offsets
sucds = "~0.8"
memmap2 = "0.7.1"
bincode = "1.3"
serde = { version = "1.0", features = ["derive"] }
bytes.workspace = true
cuckoofilter = { version = "0.5.0", features = ["serde_support", "serde_bytes"] }
tempfile.workspace = true
sucds = "~0.8"
tracing = "0.1.0"
tracing-appender = "0.2"
anyhow = "1.0"
thiserror.workspace = true
hex = "*"
[dev-dependencies]
rand = { version = "0.8", features = ["small_rng"] }

View File

@ -0,0 +1,83 @@
use crate::{compression::Compression, NippyJarError};
use serde::{Deserialize, Serialize};
/// Wrapper type for `lz4_flex` that implements [`Compression`].
#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
#[non_exhaustive]
pub struct Lz4;
impl Compression for Lz4 {
fn decompress_to(&self, value: &[u8], dest: &mut Vec<u8>) -> Result<(), NippyJarError> {
let previous_length = dest.len();
// SAFETY: We're setting len to the existing capacity.
unsafe {
dest.set_len(dest.capacity());
}
match lz4_flex::decompress_into(value, &mut dest[previous_length..]) {
Ok(written) => {
// SAFETY: `compress_into` can only write if there's enough capacity. Therefore, it
// shouldn't write more than our capacity.
unsafe {
dest.set_len(previous_length + written);
}
Ok(())
}
Err(_) => {
// SAFETY: we are resetting it to the previous value.
unsafe {
dest.set_len(previous_length);
}
Err(NippyJarError::OutputTooSmall)
}
}
}
fn decompress(&self, value: &[u8]) -> Result<Vec<u8>, NippyJarError> {
let mut multiplier = 1;
loop {
match lz4_flex::decompress(value, multiplier * value.len()) {
Ok(v) => return Ok(v),
Err(err) => {
multiplier *= 2;
if multiplier == 16 {
return Err(NippyJarError::Custom(err.to_string()))
}
}
}
}
}
fn compress_to(&self, src: &[u8], dest: &mut Vec<u8>) -> Result<usize, NippyJarError> {
let previous_length = dest.len();
// SAFETY: We're setting len to the existing capacity.
unsafe {
dest.set_len(dest.capacity());
}
match lz4_flex::compress_into(src, &mut dest[previous_length..]) {
Ok(written) => {
// SAFETY: `compress_into` can only write if there's enough capacity. Therefore, it
// shouldn't write more than our capacity.
unsafe {
dest.set_len(previous_length + written);
}
Ok(written)
}
Err(_) => {
// SAFETY: we are resetting it to the previous value.
unsafe {
dest.set_len(previous_length);
}
Err(NippyJarError::OutputTooSmall)
}
}
}
fn compress(&self, src: &[u8]) -> Result<Vec<u8>, NippyJarError> {
Ok(lz4_flex::compress(src))
}
}

View File

@ -1,17 +1,24 @@
use crate::NippyJarError;
use serde::{Deserialize, Serialize};
use std::io::Write;
mod zstd;
pub use self::zstd::{Zstd, ZstdState};
pub use self::zstd::{DecoderDictionary, Decompressor, Zstd, ZstdState};
mod lz4;
pub use self::lz4::Lz4;
/// Trait that will compress column values
pub trait Compression: Serialize + for<'a> Deserialize<'a> {
/// Appends decompressed data to the dest buffer. Requires `dest` to have sufficient capacity.
fn decompress_to(&self, value: &[u8], dest: &mut Vec<u8>) -> Result<(), NippyJarError>;
/// Returns decompressed data.
fn decompress(&self, value: &[u8]) -> Result<Vec<u8>, NippyJarError>;
/// Compresses data from `src` to `dest`
fn compress_to<W: Write>(&self, src: &[u8], dest: &mut W) -> Result<(), NippyJarError>;
/// Appends compressed data from `src` to `dest`. `dest`. Requires `dest` to have sufficient
/// capacity.
///
/// Returns number of bytes written to `dest`.
fn compress_to(&self, src: &[u8], dest: &mut Vec<u8>) -> Result<usize, NippyJarError>;
/// Compresses data from `src`
fn compress(&self, src: &[u8]) -> Result<Vec<u8>, NippyJarError>;
@ -37,36 +44,54 @@ pub trait Compression: Serialize + for<'a> Deserialize<'a> {
#[cfg_attr(test, derive(PartialEq))]
pub enum Compressors {
Zstd(Zstd),
// Avoids irrefutable let errors. Remove this after adding another one.
Unused,
Lz4(Lz4),
}
impl Compression for Compressors {
fn decompress_to(&self, value: &[u8], dest: &mut Vec<u8>) -> Result<(), NippyJarError> {
match self {
Compressors::Zstd(zstd) => zstd.decompress_to(value, dest),
Compressors::Lz4(lz4) => lz4.decompress_to(value, dest),
}
}
fn decompress(&self, value: &[u8]) -> Result<Vec<u8>, NippyJarError> {
match self {
Compressors::Zstd(zstd) => zstd.decompress(value),
Compressors::Unused => unimplemented!(),
Compressors::Lz4(lz4) => lz4.decompress(value),
}
}
fn compress_to<W: Write>(&self, src: &[u8], dest: &mut W) -> Result<(), NippyJarError> {
match self {
Compressors::Zstd(zstd) => zstd.compress_to(src, dest),
Compressors::Unused => unimplemented!(),
fn compress_to(&self, src: &[u8], dest: &mut Vec<u8>) -> Result<usize, NippyJarError> {
let initial_capacity = dest.capacity();
loop {
let result = match self {
Compressors::Zstd(zstd) => zstd.compress_to(src, dest),
Compressors::Lz4(lz4) => lz4.compress_to(src, dest),
};
match result {
Ok(v) => return Ok(v),
Err(err) => match err {
NippyJarError::OutputTooSmall => {
dest.reserve(initial_capacity);
}
_ => return Err(err),
},
}
}
}
fn compress(&self, src: &[u8]) -> Result<Vec<u8>, NippyJarError> {
match self {
Compressors::Zstd(zstd) => zstd.compress(src),
Compressors::Unused => unimplemented!(),
Compressors::Lz4(lz4) => lz4.compress(src),
}
}
fn is_ready(&self) -> bool {
match self {
Compressors::Zstd(zstd) => zstd.is_ready(),
Compressors::Unused => unimplemented!(),
Compressors::Lz4(lz4) => lz4.is_ready(),
}
}
@ -76,7 +101,7 @@ impl Compression for Compressors {
) -> Result<(), NippyJarError> {
match self {
Compressors::Zstd(zstd) => zstd.prepare_compression(columns),
Compressors::Unused => Ok(()),
Compressors::Lz4(lz4) => lz4.prepare_compression(columns),
}
}
}

View File

@ -4,10 +4,9 @@ use std::{
fs::File,
io::{Read, Write},
};
use zstd::{
bulk::{Compressor, Decompressor},
dict::DecoderDictionary,
};
use tracing::*;
use zstd::bulk::Compressor;
pub use zstd::{bulk::Decompressor, dict::DecoderDictionary};
type RawDictionary = Vec<u8>;
@ -26,7 +25,7 @@ pub struct Zstd {
/// Compression level. A level of `0` uses zstd's default (currently `3`).
pub(crate) level: i32,
/// Uses custom dictionaries to compress data.
pub(crate) use_dict: bool,
pub use_dict: bool,
/// Max size of a dictionary
pub(crate) max_dict_size: usize,
/// List of column dictionaries.
@ -87,6 +86,8 @@ impl Zstd {
let mut compressors = None;
if let Some(dictionaries) = &self.raw_dictionaries {
debug!(target: "nippy-jar", count=?dictionaries.len(), "Generating ZSTD compressor dictionaries.");
let mut cmp = Vec::with_capacity(dictionaries.len());
for dict in dictionaries {
@ -99,10 +100,11 @@ impl Zstd {
}
}
/// Compresses a value using a dictionary.
/// Compresses a value using a dictionary. Reserves additional capacity for `buffer` if
/// necessary.
pub fn compress_with_dictionary(
column_value: &[u8],
tmp_buf: &mut Vec<u8>,
buffer: &mut Vec<u8>,
handle: &mut File,
compressor: Option<&mut Compressor<'_>>,
) -> Result<(), NippyJarError> {
@ -112,16 +114,16 @@ impl Zstd {
// enough, the compressed buffer will actually be larger. We keep retrying.
// If we eventually fail, it probably means it's another kind of error.
let mut multiplier = 1;
while let Err(err) = compressor.compress_to_buffer(column_value, tmp_buf) {
tmp_buf.reserve(column_value.len() * multiplier);
while let Err(err) = compressor.compress_to_buffer(column_value, buffer) {
buffer.reserve(column_value.len() * multiplier);
multiplier += 1;
if multiplier == 5 {
return Err(NippyJarError::Disconnect(err))
}
}
handle.write_all(tmp_buf)?;
tmp_buf.clear();
handle.write_all(buffer)?;
buffer.clear();
} else {
handle.write_all(column_value)?;
}
@ -129,38 +131,46 @@ impl Zstd {
Ok(())
}
/// Decompresses a value using a dictionary to a user provided buffer.
/// Appends a decompressed value using a dictionary to a user provided buffer.
pub fn decompress_with_dictionary(
column_value: &[u8],
output: &mut Vec<u8>,
decompressor: &mut Decompressor<'_>,
) -> Result<(), NippyJarError> {
let mut multiplier = 1;
let previous_length = output.len();
// Just an estimation.
let required_capacity = column_value.len() * 2;
output.reserve(required_capacity.saturating_sub(output.capacity()));
// Decompressor requires the destination buffer to be big enough to write to, otherwise it
// fails. However, we don't know how big it will be. We keep retrying.
// If we eventually fail, it probably means it's another kind of error.
while let Err(err) = decompressor.decompress_to_buffer(column_value, output) {
output.reserve(
Decompressor::upper_bound(column_value).unwrap_or(required_capacity) * multiplier,
);
multiplier += 1;
if multiplier == 5 {
return Err(NippyJarError::Disconnect(err))
}
// SAFETY: We're setting len to the existing capacity.
unsafe {
output.set_len(output.capacity());
}
Ok(())
match decompressor.decompress_to_buffer(column_value, &mut output[previous_length..]) {
Ok(written) => {
// SAFETY: `decompress_to_buffer` can only write if there's enough capacity.
// Therefore, it shouldn't write more than our capacity.
unsafe {
output.set_len(previous_length + written);
}
Ok(())
}
Err(_) => {
// SAFETY: we are resetting it to the previous value.
unsafe {
output.set_len(previous_length);
}
Err(NippyJarError::OutputTooSmall)
}
}
}
}
impl Compression for Zstd {
fn decompress_to(&self, value: &[u8], dest: &mut Vec<u8>) -> Result<(), NippyJarError> {
let mut decoder = zstd::Decoder::with_dictionary(value, &[])?;
decoder.read_to_end(dest)?;
Ok(())
}
fn decompress(&self, value: &[u8]) -> Result<Vec<u8>, NippyJarError> {
let mut decompressed = Vec::with_capacity(value.len() * 2);
let mut decoder = zstd::Decoder::new(value)?;
@ -168,13 +178,15 @@ impl Compression for Zstd {
Ok(decompressed)
}
fn compress_to<W: Write>(&self, src: &[u8], dest: &mut W) -> Result<(), NippyJarError> {
fn compress_to(&self, src: &[u8], dest: &mut Vec<u8>) -> Result<usize, NippyJarError> {
let before = dest.len();
let mut encoder = zstd::Encoder::new(dest, self.level)?;
encoder.write_all(src)?;
encoder.finish()?;
let dest = encoder.finish()?;
Ok(())
Ok(dest.len() - before)
}
fn compress(&self, src: &[u8]) -> Result<Vec<u8>, NippyJarError> {

View File

@ -1,10 +1,10 @@
use crate::{
compression::{Compression, Zstd},
InclusionFilter, NippyJar, NippyJarError, PerfectHashingFunction, Row,
InclusionFilter, NippyJar, NippyJarError, PerfectHashingFunction, RefRow,
};
use memmap2::Mmap;
use serde::{de::Deserialize, ser::Serialize};
use std::{clone::Clone, fs::File};
use std::{fs::File, ops::Range};
use sucds::int_vectors::Access;
use zstd::bulk::Decompressor;
@ -19,14 +19,13 @@ pub struct NippyJarCursor<'a, H = ()> {
file_handle: File,
/// Data file.
mmap_handle: Mmap,
/// Temporary buffer to unload data to (if necessary), without reallocating memory on each
/// retrieval.
tmp_buf: Vec<u8>,
/// Internal buffer to unload data to without reallocating memory on each retrieval.
internal_buffer: Vec<u8>,
/// Cursor row position.
row: u64,
}
impl<'a, H> std::fmt::Debug for NippyJarCursor<'a, H>
impl<'a, H: std::fmt::Debug> std::fmt::Debug for NippyJarCursor<'a, H>
where
H: Send + Sync + Serialize + for<'b> Deserialize<'b> + core::fmt::Debug,
{
@ -37,7 +36,7 @@ where
impl<'a, H> NippyJarCursor<'a, H>
where
H: Send + Sync + Serialize + for<'b> Deserialize<'b>,
H: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug,
{
pub fn new(
jar: &'a NippyJar<H>,
@ -53,7 +52,8 @@ where
zstd_decompressors,
file_handle: file,
mmap_handle: mmap,
tmp_buf: vec![],
// Makes sure that we have enough buffer capacity to decompress any row of data.
internal_buffer: Vec::with_capacity(jar.max_row_size),
row: 0,
})
}
@ -69,7 +69,7 @@ where
///
/// Example usage would be querying a transactions file with a transaction hash which is **NOT**
/// stored in file.
pub fn row_by_key(&mut self, key: &[u8]) -> Result<Option<Row>, NippyJarError> {
pub fn row_by_key(&mut self, key: &[u8]) -> Result<Option<RefRow<'_>>, NippyJarError> {
if let (Some(filter), Some(phf)) = (&self.jar.filter, &self.jar.phf) {
// TODO: is it worth to parallize both?
@ -93,13 +93,15 @@ where
}
/// Returns a row by its number.
pub fn row_by_number(&mut self, row: usize) -> Result<Option<Row>, NippyJarError> {
pub fn row_by_number(&mut self, row: usize) -> Result<Option<RefRow<'_>>, NippyJarError> {
self.row = row as u64;
self.next_row()
}
/// Returns the current value and advances the row.
pub fn next_row(&mut self) -> Result<Option<Row>, NippyJarError> {
pub fn next_row(&mut self) -> Result<Option<RefRow<'_>>, NippyJarError> {
self.internal_buffer.clear();
if self.row as usize * self.jar.columns >= self.jar.offsets.len() {
// Has reached the end
return Ok(None)
@ -114,7 +116,14 @@ where
self.row += 1;
Ok(Some(row))
Ok(Some(
row.into_iter()
.map(|v| match v {
ValueRange::Mmap(range) => &self.mmap_handle[range],
ValueRange::Internal(range) => &self.internal_buffer[range],
})
.collect(),
))
}
/// Returns a row, searching it by a key used during [`NippyJar::prepare_index`] by using a
@ -127,7 +136,7 @@ where
pub fn row_by_key_with_cols<const MASK: usize, const COLUMNS: usize>(
&mut self,
key: &[u8],
) -> Result<Option<Row>, NippyJarError> {
) -> Result<Option<RefRow<'_>>, NippyJarError> {
if let (Some(filter), Some(phf)) = (&self.jar.filter, &self.jar.phf) {
// TODO: is it worth to parallize both?
@ -154,7 +163,7 @@ where
pub fn row_by_number_with_cols<const MASK: usize, const COLUMNS: usize>(
&mut self,
row: usize,
) -> Result<Option<Row>, NippyJarError> {
) -> Result<Option<RefRow<'_>>, NippyJarError> {
self.row = row as u64;
self.next_row_with_cols::<MASK, COLUMNS>()
}
@ -164,8 +173,8 @@ where
/// Uses a `MASK` to only read certain columns from the row.
pub fn next_row_with_cols<const MASK: usize, const COLUMNS: usize>(
&mut self,
) -> Result<Option<Row>, NippyJarError> {
debug_assert!(COLUMNS == self.jar.columns);
) -> Result<Option<RefRow<'_>>, NippyJarError> {
self.internal_buffer.clear();
if self.row as usize * self.jar.columns >= self.jar.offsets.len() {
// Has reached the end
@ -179,45 +188,68 @@ where
self.read_value(column, &mut row)?
}
}
self.row += 1;
Ok(Some(row))
Ok(Some(
row.into_iter()
.map(|v| match v {
ValueRange::Mmap(range) => &self.mmap_handle[range],
ValueRange::Internal(range) => &self.internal_buffer[range],
})
.collect(),
))
}
/// Takes the column index and reads the value for the corresponding column.
fn read_value(&mut self, column: usize, row: &mut Row) -> Result<(), NippyJarError> {
/// Takes the column index and reads the range value for the corresponding column.
fn read_value(
&mut self,
column: usize,
row: &mut Vec<ValueRange>,
) -> Result<(), NippyJarError> {
// Find out the offset of the column value
let offset_pos = self.row as usize * self.jar.columns + column;
let value_offset = self.jar.offsets.select(offset_pos).expect("should exist");
let column_value = if self.jar.offsets.len() == (offset_pos + 1) {
let column_offset_range = if self.jar.offsets.len() == (offset_pos + 1) {
// It's the last column of the last row
&self.mmap_handle[value_offset..]
value_offset..self.mmap_handle.len()
} else {
let next_value_offset = self.jar.offsets.select(offset_pos + 1).expect("should exist");
&self.mmap_handle[value_offset..next_value_offset]
value_offset..next_value_offset
};
if let Some(zstd_dict_decompressors) = self.zstd_decompressors.as_mut() {
self.tmp_buf.clear();
let from: usize = self.internal_buffer.len();
if let Some(decompressor) = zstd_dict_decompressors.get_mut(column) {
Zstd::decompress_with_dictionary(column_value, &mut self.tmp_buf, decompressor)?;
Zstd::decompress_with_dictionary(
&self.mmap_handle[column_offset_range],
&mut self.internal_buffer,
decompressor,
)?;
}
let to = self.internal_buffer.len();
debug_assert!(!self.tmp_buf.is_empty());
row.push(self.tmp_buf.clone());
} else if let Some(compression) = &self.jar.compressor {
row.push(ValueRange::Internal(from..to));
} else if let Some(compression) = self.jar.compressor() {
// Uses the chosen default decompressor
row.push(compression.decompress(column_value)?);
let from = self.internal_buffer.len();
compression
.decompress_to(&self.mmap_handle[column_offset_range], &mut self.internal_buffer)?;
let to = self.internal_buffer.len();
row.push(ValueRange::Internal(from..to));
} else {
// Not compressed
// TODO: return Cow<&> instead of copying if there's no compression
row.push(column_value.to_vec())
row.push(ValueRange::Mmap(column_offset_range));
}
Ok(())
}
}
/// Helper type that stores the range of the decompressed column value either on a `mmap` slice or
/// on the internal buffer.
enum ValueRange {
Mmap(Range<usize>),
Internal(Range<usize>),
}

View File

@ -7,6 +7,8 @@ pub enum NippyJarError {
Internal(#[from] Box<dyn std::error::Error + Send + Sync>),
#[error(transparent)]
Disconnect(#[from] std::io::Error),
#[error("{0}")]
Custom(String),
#[error(transparent)]
Bincode(#[from] Box<bincode::ErrorKind>),
#[error(transparent)]
@ -33,4 +35,6 @@ pub enum NippyJarError {
PHFMissing,
#[error("NippyJar was built without an index.")]
UnsupportedFilterQuery,
#[error("Compression or decompression requires a bigger destination output.")]
OutputTooSmall,
}

View File

@ -17,6 +17,10 @@ pub struct Cuckoo {
impl Cuckoo {
pub fn new(max_capacity: usize) -> Self {
// CuckooFilter might return `NotEnoughSpace` even if they are remaining elements, if it's
// close to capacity. Therefore, we increase it.
let max_capacity = max_capacity + 100 + max_capacity / 3;
Cuckoo { remaining: max_capacity, filter: CuckooFilter::with_capacity(max_capacity) }
}
}

View File

@ -24,6 +24,7 @@ use sucds::{
mii_sequences::{EliasFano, EliasFanoBuilder},
Serializable,
};
use tracing::*;
pub mod filter;
use filter::{Cuckoo, InclusionFilter, InclusionFilters};
@ -43,8 +44,9 @@ pub use cursor::NippyJarCursor;
const NIPPY_JAR_VERSION: usize = 1;
/// A [`Row`] is a list of its selected column values.
type Row = Vec<Vec<u8>>;
/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a
/// memory-mapped file.
type RefRow<'a> = Vec<&'a [u8]>;
/// Alias type for a column value wrapped in `Result`
pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
@ -73,7 +75,7 @@ pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
///
/// Ultimately, the `freeze` function yields two files: a data file containing both the data and its
/// configuration, and an index file that houses the offsets and offsets_index.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct NippyJar<H = ()> {
/// The version of the NippyJar format.
@ -95,11 +97,34 @@ pub struct NippyJar<H = ()> {
/// Offsets within the file for each column value, arranged by row and column.
#[serde(skip)]
offsets: EliasFano,
/// Maximum uncompressed row size of the set. This will enable decompression without any
/// resizing of the output buffer.
#[serde(skip)]
max_row_size: usize,
/// Data path for file. Index file will be `{path}.idx`
#[serde(skip)]
path: Option<PathBuf>,
}
impl<H: std::fmt::Debug> std::fmt::Debug for NippyJar<H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NippyJar")
.field("version", &self.version)
.field("user_header", &self.user_header)
.field("columns", &self.columns)
.field("compressor", &self.compressor)
.field("filter", &self.filter)
.field("phf", &self.phf)
.field("offsets_index (len)", &self.offsets_index.len())
.field("offsets_index (size in bytes)", &self.offsets_index.size_in_bytes())
.field("offsets (len)", &self.offsets.len())
.field("offsets (size in bytes)", &self.offsets.size_in_bytes())
.field("path", &self.path)
.field("max_row_size", &self.max_row_size)
.finish_non_exhaustive()
}
}
impl NippyJar<()> {
/// Creates a new [`NippyJar`] without an user-defined header data.
pub fn new_without_header(columns: usize, path: &Path) -> Self {
@ -119,7 +144,7 @@ impl NippyJar<()> {
impl<H> NippyJar<H>
where
H: Send + Sync + Serialize + for<'a> Deserialize<'a>,
H: Send + Sync + Serialize + for<'a> Deserialize<'a> + std::fmt::Debug,
{
/// Creates a new [`NippyJar`] with a user-defined header data.
pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
@ -127,6 +152,7 @@ where
version: NIPPY_JAR_VERSION,
user_header,
columns,
max_row_size: 0,
compressor: None,
filter: None,
phf: None,
@ -143,6 +169,12 @@ where
self
}
/// Adds [`compression::Lz4`] compression.
pub fn with_lz4(mut self) -> Self {
self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
self
}
/// Adds [`filter::Cuckoo`] filter.
pub fn with_cuckoo_filter(mut self, max_capacity: usize) -> Self {
self.filter = Some(InclusionFilters::Cuckoo(Cuckoo::new(max_capacity)));
@ -166,6 +198,16 @@ where
&self.user_header
}
/// Gets a reference to the compressor.
pub fn compressor(&self) -> Option<&Compressors> {
self.compressor.as_ref()
}
/// Gets a mutable reference to the compressor.
pub fn compressor_mut(&mut self) -> Option<&mut Compressors> {
self.compressor.as_mut()
}
/// Loads the file configuration and returns [`Self`].
///
/// **The user must ensure the header type matches the one used during the jar's creation.**
@ -185,7 +227,8 @@ where
let mmap = unsafe { memmap2::Mmap::map(&offsets_file)? };
let mut offsets_reader = mmap.as_ref();
obj.offsets = EliasFano::deserialize_from(&mut offsets_reader)?;
obj.offsets_index = PrefixSummedEliasFano::deserialize_from(offsets_reader)?;
obj.offsets_index = PrefixSummedEliasFano::deserialize_from(&mut offsets_reader)?;
obj.max_row_size = bincode::deserialize_from(offsets_reader)?;
Ok(obj)
}
@ -211,6 +254,7 @@ where
) -> Result<(), NippyJarError> {
// Makes any necessary preparations for the compressors
if let Some(compression) = &mut self.compressor {
debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
compression.prepare_compression(columns)?;
}
Ok(())
@ -226,15 +270,27 @@ where
values: impl IntoIterator<Item = ColumnResult<T>>,
row_count: usize,
) -> Result<(), NippyJarError> {
debug!(target: "nippy-jar", ?row_count, "Preparing index.");
let values = values.into_iter().collect::<Result<Vec<_>, _>>()?;
debug_assert!(
row_count == values.len(),
"Row count ({row_count}) differs from value list count ({}).",
values.len()
);
let mut offsets_index = vec![0; row_count];
// Builds perfect hashing function from the values
if let Some(phf) = self.phf.as_mut() {
debug!(target: "nippy-jar", ?row_count, values_count = ?values.len(), "Setting keys for perfect hashing function.");
phf.set_keys(&values)?;
}
if self.filter.is_some() || self.phf.is_some() {
debug!(target: "nippy-jar", ?row_count, "Creating filter and offsets_index.");
for (row_num, v) in values.into_iter().enumerate() {
if let Some(filter) = self.filter.as_mut() {
filter.add(v.as_ref())?;
@ -248,6 +304,7 @@ where
}
}
debug!(target: "nippy-jar", ?row_count, "Encoding offsets index list.");
self.offsets_index = PrefixSummedEliasFano::from_slice(&offsets_index)?;
Ok(())
}
@ -271,7 +328,7 @@ where
// Temporary buffer to avoid multiple reallocations if compressing to a buffer (eg. zstd w/
// dict)
let mut tmp_buf = Vec::with_capacity(100);
let mut tmp_buf = Vec::with_capacity(1_000_000);
// Write all rows while taking all row start offsets
let mut row_number = 0u64;
@ -279,16 +336,21 @@ where
let mut column_iterators =
columns.into_iter().map(|v| v.into_iter()).collect::<Vec<_>>().into_iter();
debug!(target: "nippy-jar", compressor=?self.compressor, "Writing rows.");
loop {
let mut iterators = Vec::with_capacity(self.columns);
// Write the column value of each row
// TODO: iter_mut if we remove the IntoIterator interface.
let mut uncompressed_row_size = 0;
for (column_number, mut column_iter) in column_iterators.enumerate() {
offsets.push(file.stream_position()? as usize);
match column_iter.next() {
Some(Ok(value)) => {
uncompressed_row_size += value.len();
if let Some(compression) = &self.compressor {
// Special zstd case with dictionaries
if let (Some(dict_compressors), Compressors::Zstd(_)) =
@ -301,7 +363,9 @@ where
Some(dict_compressors.get_mut(column_number).expect("exists")),
)?;
} else {
compression.compress_to(&value, &mut file)?;
let before = tmp_buf.len();
let len = compression.compress_to(&value, &mut tmp_buf)?;
file.write_all(&tmp_buf[before..before + len])?;
}
} else {
file.write_all(&value)?;
@ -319,7 +383,10 @@ where
iterators.push(column_iter);
}
tmp_buf.clear();
row_number += 1;
self.max_row_size = self.max_row_size.max(uncompressed_row_size);
if row_number == total_rows {
break
}
@ -330,12 +397,16 @@ where
// Write offsets and offset index to file
self.freeze_offsets(offsets)?;
debug!(target: "nippy-jar", jar=?self, "Finished.");
Ok(())
}
/// Freezes offsets and its own index.
fn freeze_offsets(&mut self, offsets: Vec<usize>) -> Result<(), NippyJarError> {
if !offsets.is_empty() {
debug!(target: "nippy-jar", "Encoding offsets list.");
let mut builder =
EliasFanoBuilder::new(*offsets.last().expect("qed") + 1, offsets.len())?;
@ -344,9 +415,13 @@ where
}
self.offsets = builder.build().enable_rank();
}
debug!(target: "nippy-jar", path=?self.index_path(), "Writing offsets and offsets index to file.");
let mut file = File::create(self.index_path())?;
self.offsets.serialize_into(&mut file)?;
self.offsets_index.serialize_into(file)?;
self.offsets_index.serialize_into(&mut file)?;
self.max_row_size.serialize_into(file)?;
Ok(())
}
@ -370,6 +445,8 @@ where
let _ = phf.get_index(&[])?;
}
debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
Ok(File::create(self.data_path())?)
}
@ -517,10 +594,6 @@ mod tests {
// // Add more columns until max_capacity
assert!(InclusionFilter::add(&mut nippy, &col1[2]).is_ok());
assert!(InclusionFilter::add(&mut nippy, &col1[3]).is_ok());
assert!(matches!(
InclusionFilter::add(&mut nippy, &col1[4]),
Err(NippyJarError::FilterMaxCapacity)
));
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
@ -542,13 +615,13 @@ mod tests {
let file_path = tempfile::NamedTempFile::new().unwrap();
let nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(nippy.compressor.is_none());
assert!(nippy.compressor().is_none());
let mut nippy =
NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
assert!(nippy.compressor.is_some());
assert!(nippy.compressor().is_some());
if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor {
if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
assert!(matches!(zstd.generate_compressors(), Err(NippyJarError::CompressorNotReady)));
// Make sure the number of column iterators match the initial set up ones.
@ -567,7 +640,7 @@ mod tests {
nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
if let Some(Compressors::Zstd(zstd)) = &nippy.compressor {
if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
assert!(matches!(
(&zstd.state, zstd.raw_dictionaries.as_ref().map(|dict| dict.len())),
(compression::ZstdState::Ready, Some(columns)) if columns == num_columns
@ -580,11 +653,11 @@ mod tests {
assert_eq!(nippy, loaded_nippy);
let mut dicts = vec![];
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_mut() {
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor_mut() {
dicts = zstd.generate_decompress_dictionaries().unwrap()
}
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() {
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(
&loaded_nippy,
Some(zstd.generate_decompressors(&dicts).unwrap()),
@ -594,12 +667,50 @@ mod tests {
// Iterate over compressed values and compare
let mut row_index = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!((&row[0], &row[1]), (&col1[row_index], &col2[row_index]));
assert_eq!(
(row[0], row[1]),
(col1[row_index].as_slice(), col2[row_index].as_slice())
);
row_index += 1;
}
}
}
#[test]
fn test_lz4() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(nippy.compressor().is_none());
let mut nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
assert!(nippy.compressor().is_some());
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy, loaded_nippy);
if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(&loaded_nippy, None).unwrap();
// Iterate over compressed values and compare
let mut row_index = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!(
(row[0], row[1]),
(col1[row_index].as_slice(), col2[row_index].as_slice())
);
row_index += 1;
}
} else {
panic!("Expected Lz4 compressor")
}
}
#[test]
fn test_zstd_no_dictionaries() {
let (col1, col2) = test_data(None);
@ -608,18 +719,18 @@ mod tests {
let file_path = tempfile::NamedTempFile::new().unwrap();
let nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(nippy.compressor.is_none());
assert!(nippy.compressor().is_none());
let mut nippy =
NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
assert!(nippy.compressor.is_some());
assert!(nippy.compressor().is_some());
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy, loaded_nippy);
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() {
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
assert!(!zstd.use_dict);
let mut cursor = NippyJarCursor::new(&loaded_nippy, None).unwrap();
@ -627,7 +738,10 @@ mod tests {
// Iterate over compressed values and compare
let mut row_index = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!((&row[0], &row[1]), (&col1[row_index], &col2[row_index]));
assert_eq!(
(row[0], row[1]),
(col1[row_index].as_slice(), col2[row_index].as_slice())
);
row_index += 1;
}
} else {
@ -670,16 +784,16 @@ mod tests {
{
let mut loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
assert!(loaded_nippy.compressor.is_some());
assert!(loaded_nippy.compressor().is_some());
assert!(loaded_nippy.filter.is_some());
assert!(loaded_nippy.phf.is_some());
assert_eq!(loaded_nippy.user_header().block_start, block_start);
let mut dicts = vec![];
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_mut() {
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor_mut() {
dicts = zstd.generate_decompress_dictionaries().unwrap()
}
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() {
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(
&loaded_nippy,
Some(zstd.generate_decompressors(&dicts).unwrap()),
@ -689,7 +803,10 @@ mod tests {
// Iterate over compressed values and compare
let mut row_num = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!((&row[0], &row[1]), (&data[0][row_num], &data[1][row_num]));
assert_eq!(
(row[0], row[1]),
(data[0][row_num].as_slice(), data[1][row_num].as_slice())
);
row_num += 1;
}
@ -700,12 +817,20 @@ mod tests {
for (row_num, (v0, v1)) in data {
// Simulates `by_hash` queries by iterating col1 values, which were used to
// create the inner index.
let row_by_value = cursor.row_by_key(v0).unwrap().unwrap();
assert_eq!((&row_by_value[0], &row_by_value[1]), (v0, v1));
{
let row_by_value = cursor
.row_by_key(v0)
.unwrap()
.unwrap()
.iter()
.map(|a| a.to_vec())
.collect::<Vec<_>>();
assert_eq!((&row_by_value[0], &row_by_value[1]), (v0, v1));
// Simulates `by_number` queries
let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
assert_eq!(row_by_value, row_by_num);
// Simulates `by_number` queries
let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
assert_eq!(row_by_value, row_by_num);
}
}
}
}
@ -738,10 +863,10 @@ mod tests {
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
let mut dicts = vec![];
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_mut() {
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor_mut() {
dicts = zstd.generate_decompress_dictionaries().unwrap()
}
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() {
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(
&loaded_nippy,
Some(zstd.generate_decompressors(&dicts).unwrap()),
@ -763,7 +888,10 @@ mod tests {
let row_by_value = cursor
.row_by_key_with_cols::<BLOCKS_FULL_MASK, BLOCKS_COLUMNS>(v0)
.unwrap()
.unwrap();
.unwrap()
.iter()
.map(|a| a.to_vec())
.collect::<Vec<_>>();
assert_eq!((&row_by_value[0], &row_by_value[1]), (*v0, *v1));
// Simulates `by_number` queries
@ -782,7 +910,10 @@ mod tests {
let row_by_value = cursor
.row_by_key_with_cols::<BLOCKS_BLOCK_MASK, BLOCKS_COLUMNS>(v0)
.unwrap()
.unwrap();
.unwrap()
.iter()
.map(|a| a.to_vec())
.collect::<Vec<_>>();
assert_eq!(row_by_value.len(), 1);
assert_eq!(&row_by_value[0], *v0);
@ -803,7 +934,10 @@ mod tests {
let row_by_value = cursor
.row_by_key_with_cols::<BLOCKS_WITHDRAWAL_MASK, BLOCKS_COLUMNS>(v0)
.unwrap()
.unwrap();
.unwrap()
.iter()
.map(|a| a.to_vec())
.collect::<Vec<_>>();
assert_eq!(row_by_value.len(), 1);
assert_eq!(&row_by_value[0], *v1);

View File

@ -60,7 +60,6 @@ impl PartialEq for Fmph {
impl std::fmt::Debug for Fmph {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Fmph")
.field("level_sizes", &self.function.as_ref().map(|f| f.level_sizes()))
.field("bytes_size", &self.function.as_ref().map(|f| f.write_bytes()))
.finish_non_exhaustive()
}

View File

@ -60,7 +60,6 @@ impl PartialEq for GoFmph {
impl std::fmt::Debug for GoFmph {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GoFmph")
.field("level_sizes", &self.function.as_ref().map(|f| f.level_sizes()))
.field("bytes_size", &self.function.as_ref().map(|f| f.write_bytes()))
.finish_non_exhaustive()
}

View File

@ -3,26 +3,37 @@ use reth_db::{
table::{Decompress, Table},
HeaderTD,
};
use reth_interfaces::RethResult;
use reth_nippy_jar::{NippyJar, NippyJarCursor};
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_nippy_jar::{compression::Decompressor, NippyJar, NippyJarCursor};
use reth_primitives::{BlockHash, BlockNumber, Header, SealedHeader, U256};
use std::ops::RangeBounds;
/// SnapshotProvider
///
/// WIP Rudimentary impl just for testes
/// WIP Rudimentary impl just for tests
/// TODO: should be able to walk through snapshot files/block_ranges
/// TODO: Arc over NippyJars and/or NippyJarCursors (LRU)
#[derive(Debug)]
pub struct SnapshotProvider<'a> {
/// NippyJar
pub jar: &'a NippyJar,
/// Starting snapshot block
pub jar_start_block: u64,
}
impl<'a> SnapshotProvider<'a> {
fn cursor(&self) -> NippyJarCursor<'a> {
/// Creates cursor
pub fn cursor(&self) -> NippyJarCursor<'a> {
NippyJarCursor::new(self.jar, None).unwrap()
}
/// Creates cursor with zstd decompressors
pub fn cursor_with_decompressors(
&self,
decompressors: Vec<Decompressor<'a>>,
) -> NippyJarCursor<'a> {
NippyJarCursor::new(self.jar, Some(decompressors)).unwrap()
}
}
impl<'a> HeaderProvider for SnapshotProvider<'a> {
@ -31,7 +42,7 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> {
let mut cursor = self.cursor();
let header = Header::decompress(
&cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0],
cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0],
)
.unwrap();
@ -43,8 +54,14 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> {
Ok(None)
}
fn header_by_number(&self, _num: BlockNumber) -> RethResult<Option<Header>> {
unimplemented!();
fn header_by_number(&self, num: BlockNumber) -> RethResult<Option<Header>> {
Header::decompress(
self.cursor()
.row_by_number_with_cols::<0b01, 2>((num - self.jar_start_block) as usize)?
.ok_or(ProviderError::HeaderNotFound(num.into()))?[0],
)
.map(Some)
.map_err(Into::into)
}
fn header_td(&self, block_hash: &BlockHash) -> RethResult<Option<U256>> {
@ -53,8 +70,8 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> {
let row = cursor.row_by_key_with_cols::<0b11, 2>(&block_hash.0).unwrap().unwrap();
let header = Header::decompress(&row[0]).unwrap();
let td = <HeaderTD as Table>::Value::decompress(&row[1]).unwrap();
let header = Header::decompress(row[0]).unwrap();
let td = <HeaderTD as Table>::Value::decompress(row[1]).unwrap();
if &header.hash_slow() == block_hash {
return Ok(Some(td.0))
@ -166,6 +183,7 @@ mod test {
create_snapshot_T1_T2::<Headers, HeaderTD, BlockNumber>(
&tx,
range,
None,
none_vec,
Some(hashes),
row_count as usize,
@ -179,7 +197,7 @@ mod test {
let jar = NippyJar::load_without_header(snap_file.path()).unwrap();
let db_provider = factory.provider().unwrap();
let snap_provider = SnapshotProvider { jar: &jar };
let snap_provider = SnapshotProvider { jar: &jar, jar_start_block: 0 };
assert!(!headers.is_empty());