From 7a5b6312731dcfd075aeaf38d8f18f087b958d69 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Thu, 21 Sep 2023 18:52:32 +0100 Subject: [PATCH] feat: add `nippy-jar` format for snapshots (#4512) Co-authored-by: Alexey Shekhirin Co-authored-by: Matthias Seitz --- Cargo.lock | 127 ++- Cargo.toml | 1 + crates/storage/nippy-jar/Cargo.toml | 35 + .../storage/nippy-jar/src/compression/mod.rs | 82 ++ .../storage/nippy-jar/src/compression/zstd.rs | 239 ++++++ crates/storage/nippy-jar/src/cursor.rs | 223 +++++ crates/storage/nippy-jar/src/error.rs | 34 + crates/storage/nippy-jar/src/filter/cuckoo.rs | 80 ++ crates/storage/nippy-jar/src/filter/mod.rs | 39 + crates/storage/nippy-jar/src/lib.rs | 809 ++++++++++++++++++ crates/storage/nippy-jar/src/phf/fmph.rs | 104 +++ crates/storage/nippy-jar/src/phf/go_fmph.rs | 105 +++ crates/storage/nippy-jar/src/phf/mod.rs | 48 ++ 13 files changed, 1923 insertions(+), 3 deletions(-) create mode 100644 crates/storage/nippy-jar/Cargo.toml create mode 100644 crates/storage/nippy-jar/src/compression/mod.rs create mode 100644 crates/storage/nippy-jar/src/compression/zstd.rs create mode 100644 crates/storage/nippy-jar/src/cursor.rs create mode 100644 crates/storage/nippy-jar/src/error.rs create mode 100644 crates/storage/nippy-jar/src/filter/cuckoo.rs create mode 100644 crates/storage/nippy-jar/src/filter/mod.rs create mode 100644 crates/storage/nippy-jar/src/lib.rs create mode 100644 crates/storage/nippy-jar/src/phf/fmph.rs create mode 100644 crates/storage/nippy-jar/src/phf/go_fmph.rs create mode 100644 crates/storage/nippy-jar/src/phf/mod.rs diff --git a/Cargo.lock b/Cargo.lock index f60292d7e..76e4038b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -643,6 +643,12 @@ dependencies = [ "syn 2.0.36", ] +[[package]] +name = "binout" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "288c7b1c00556959bb7dc822d8adad4a30edd0d3a1fcc6839515792b8f300e5f" + [[package]] name = "bit-set" version = "0.5.3" @@ -674,6 +680,15 @@ dependencies = [ "serde", ] +[[package]] +name = "bitm" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7becd9fb525c1c507eb025ec37129a0d9320aee17c841085a48101f4f18c0d27" +dependencies = [ + "dyn_size_of", +] + [[package]] name = "bitvec" version = "1.0.1" @@ -714,6 +729,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bloomfilter" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b92db7965d438b8b4b1c1d0aedd188440a1084593c9eb7f6657e3df7e906d934" +dependencies = [ + "bit-vec", + "getrandom 0.2.10", + "siphasher 1.0.0", +] + [[package]] name = "blst" version = "0.3.11" @@ -1563,6 +1589,20 @@ dependencies = [ "cipher 0.4.4", ] +[[package]] +name = "cuckoofilter" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b810a8449931679f64cd7eef1bbd0fa315801b6d5d9cdc1ace2804d6529eee18" +dependencies = [ + "byteorder", + "fnv", + "rand 0.7.3", + "serde", + "serde_bytes", + "serde_derive", +] + [[package]] name = "curve25519-dalek" version = "4.1.0" @@ -1950,6 +1990,12 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbfc4744c1b8f2a09adc0e55242f60b1af195d88596bd8700be74418c056c555" +[[package]] +name = "dyn_size_of" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b8b8aeb5763fce4ccb8916a3c111f4b004d2de4d74b21da803f5671446cf519" + [[package]] name = "ecdsa" version = "0.16.8" @@ -3917,6 +3963,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memmap2" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f49388d20533534cd19360ad3d6a7dadc885944aa802ba3995040c5ec11288c6" +dependencies = [ + "libc", +] + [[package]] name = "memoffset" version = "0.9.0" @@ -4537,6 +4592,19 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "ph" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb7b1e6e2f58e63b69c3eab9ab28bea7074d327e8334a72f16cc9096c98315b9" +dependencies = [ + "binout", + "bitm", + "dyn_size_of", + "rayon", + "wyhash", +] + [[package]] name = "pharos" version = "0.5.3" @@ -4586,7 +4654,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" dependencies = [ - "siphasher", + "siphasher 0.3.11", ] [[package]] @@ -5784,6 +5852,25 @@ dependencies = [ "tokio", ] +[[package]] +name = "reth-nippy-jar" +version = "0.1.0-alpha.8" +dependencies = [ + "anyhow", + "bincode", + "bloomfilter", + "bytes", + "cuckoofilter", + "memmap2 0.7.1", + "ph", + "rand 0.8.5", + "serde", + "sucds 0.8.0", + "tempfile", + "thiserror", + "zstd", +] + [[package]] name = "reth-payload-builder" version = "0.1.0-alpha.8" @@ -5844,7 +5931,7 @@ dependencies = [ "serde_with", "sha2", "strum 0.25.0", - "sucds", + "sucds 0.6.0", "tempfile", "test-fuzz", "thiserror", @@ -6771,6 +6858,15 @@ dependencies = [ "smallvec 0.6.14", ] +[[package]] +name = "serde_bytes" +version = "0.11.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab33ec92f677585af6d88c65593ae2375adde54efdbf16d597f2cbc7a6d368ff" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.188" @@ -7026,6 +7122,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "siphasher" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54ac45299ccbd390721be55b412d41931911f654fa99e2cb8bfb57184b2061fe" + [[package]] name = "sketches-ddsketch" version = "0.2.1" @@ -7247,6 +7349,16 @@ dependencies = [ "anyhow", ] +[[package]] +name = "sucds" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ab2433064439f81e28aa0ec2ded1c27a95636a7024767ded6047a57e54958d8" +dependencies = [ + "anyhow", + "num-traits", +] + [[package]] name = "symbolic-common" version = "12.4.0" @@ -7254,7 +7366,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e0e9bc48b3852f36a84f8d0da275d50cb3c2b88b59b9ec35fdd8b7fa239e37d" dependencies = [ "debugid", - "memmap2", + "memmap2 0.5.10", "stable_deref_trait", "uuid 1.4.1", ] @@ -8534,6 +8646,15 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wyhash" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf6e163c25e3fac820b4b453185ea2dea3b6a3e0a721d4d23d75bd33734c295" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 7543f35b6..64b7cb49e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ members = [ "crates/storage/libmdbx-rs", "crates/storage/libmdbx-rs/mdbx-sys", "crates/storage/provider", + "crates/storage/nippy-jar", "crates/tracing", "crates/tasks", "crates/transaction-pool", diff --git a/crates/storage/nippy-jar/Cargo.toml b/crates/storage/nippy-jar/Cargo.toml new file mode 100644 index 000000000..036bb90d4 --- /dev/null +++ b/crates/storage/nippy-jar/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "reth-nippy-jar" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = "Immutable data store format" + +[lib] +name = "reth_nippy_jar" + +[dependencies] +memmap2 = "0.7.1" +bloomfilter = "1" +zstd = { version = "0.12", features = ["experimental", "zdict_builder"] } +ph = "0.8.0" +thiserror = "1.0" +bincode = "1.3" +serde = { version = "1.0", features = ["derive"] } +bytes = "1.5" +cuckoofilter = { version = "0.5.0", features = ["serde_support", "serde_bytes"] } +tempfile = "3.4" +sucds = "~0.8" + +anyhow = "1.0" + + +[dev-dependencies] +rand = { version = "0.8", features = ["small_rng"] } + + +[features] +default = [] \ No newline at end of file diff --git a/crates/storage/nippy-jar/src/compression/mod.rs b/crates/storage/nippy-jar/src/compression/mod.rs new file mode 100644 index 000000000..cd29500f6 --- /dev/null +++ b/crates/storage/nippy-jar/src/compression/mod.rs @@ -0,0 +1,82 @@ +use crate::NippyJarError; +use serde::{Deserialize, Serialize}; +use std::io::Write; + +mod zstd; +pub use zstd::{Zstd, ZstdState}; + +/// Trait that will compress column values +pub trait Compression: Serialize + for<'a> Deserialize<'a> { + /// Returns decompressed data. + fn decompress(&self, value: &[u8]) -> Result, NippyJarError>; + + /// Compresses data from `src` to `dest` + fn compress_to(&self, src: &[u8], dest: &mut W) -> Result<(), NippyJarError>; + + /// Compresses data from `src` + fn compress(&self, src: &[u8]) -> Result, NippyJarError>; + + /// Returns `true` if it's ready to compress. + /// + /// Example: it will return false, if `zstd` with dictionary is set, but wasn't generated. + fn is_ready(&self) -> bool { + true + } + + /// If required, prepares compression algorithm with an early pass on the data. + fn prepare_compression( + &mut self, + _columns: Vec>>, + ) -> Result<(), NippyJarError> { + Ok(()) + } +} + +/// Enum with different [`Compression`] types. +#[derive(Debug, Serialize, Deserialize)] +#[cfg_attr(test, derive(PartialEq))] +pub enum Compressors { + Zstd(Zstd), + // Avoids irrefutable let errors. Remove this after adding another one. + Unused, +} + +impl Compression for Compressors { + fn decompress(&self, value: &[u8]) -> Result, NippyJarError> { + match self { + Compressors::Zstd(zstd) => zstd.decompress(value), + Compressors::Unused => unimplemented!(), + } + } + + fn compress_to(&self, src: &[u8], dest: &mut W) -> Result<(), NippyJarError> { + match self { + Compressors::Zstd(zstd) => zstd.compress_to(src, dest), + Compressors::Unused => unimplemented!(), + } + } + + fn compress(&self, src: &[u8]) -> Result, NippyJarError> { + match self { + Compressors::Zstd(zstd) => zstd.compress(src), + Compressors::Unused => unimplemented!(), + } + } + + fn is_ready(&self) -> bool { + match self { + Compressors::Zstd(zstd) => zstd.is_ready(), + Compressors::Unused => unimplemented!(), + } + } + + fn prepare_compression( + &mut self, + columns: Vec>>, + ) -> Result<(), NippyJarError> { + match self { + Compressors::Zstd(zstd) => zstd.prepare_compression(columns), + Compressors::Unused => Ok(()), + } + } +} diff --git a/crates/storage/nippy-jar/src/compression/zstd.rs b/crates/storage/nippy-jar/src/compression/zstd.rs new file mode 100644 index 000000000..219592e76 --- /dev/null +++ b/crates/storage/nippy-jar/src/compression/zstd.rs @@ -0,0 +1,239 @@ +use crate::{compression::Compression, NippyJarError}; +use serde::{Deserialize, Serialize}; +use std::{ + fs::File, + io::{Read, Write}, +}; +use zstd::{ + bulk::{Compressor, Decompressor}, + dict::DecoderDictionary, +}; + +type RawDictionary = Vec; + +#[derive(Debug, Default, PartialEq, Serialize, Deserialize)] +pub enum ZstdState { + #[default] + PendingDictionary, + Ready, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +/// Zstd compression structure. Supports a compression dictionary per column. +pub struct Zstd { + /// State. Should be ready before compressing. + pub(crate) state: ZstdState, + /// Compression level. A level of `0` uses zstd's default (currently `3`). + pub(crate) level: i32, + /// Uses custom dictionaries to compress data. + pub(crate) use_dict: bool, + /// Max size of a dictionary + pub(crate) max_dict_size: usize, + /// List of column dictionaries. + pub(crate) raw_dictionaries: Option>, + /// Number of columns to compress. + columns: usize, +} + +impl Zstd { + /// Creates new [`Zstd`]. + pub fn new(use_dict: bool, max_dict_size: usize, columns: usize) -> Self { + Self { + state: if use_dict { ZstdState::PendingDictionary } else { ZstdState::Ready }, + level: 0, + use_dict, + max_dict_size, + raw_dictionaries: None, + columns, + } + } + + pub fn with_level(mut self, level: i32) -> Self { + self.level = level; + self + } + + /// If using dictionaries, creates a list of [`DecoderDictionary`]. + /// + /// Consumes `self.raw_dictionaries` in the process. + pub fn generate_decompress_dictionaries<'a>(&mut self) -> Option>> { + self.raw_dictionaries.take().map(|dicts| { + // TODO Can we use ::new instead, and avoid consuming? + dicts.iter().map(|dict| DecoderDictionary::copy(dict)).collect() + }) + } + + /// Creates a list of [`Decompressor`] using the given dictionaries. + pub fn generate_decompressors<'a>( + &self, + dictionaries: &'a [DecoderDictionary<'a>], + ) -> Result>, NippyJarError> { + debug_assert!(dictionaries.len() == self.columns); + + Ok(dictionaries + .iter() + .map(Decompressor::with_prepared_dictionary) + .collect::, _>>()?) + } + + /// If using dictionaries, creates a list of [`Compressor`]. + pub fn generate_compressors<'a>(&self) -> Result>>, NippyJarError> { + match self.state { + ZstdState::PendingDictionary => Err(NippyJarError::CompressorNotReady), + ZstdState::Ready => { + if !self.use_dict { + return Ok(None) + } + + let mut compressors = None; + if let Some(dictionaries) = &self.raw_dictionaries { + let mut cmp = Vec::with_capacity(dictionaries.len()); + + for dict in dictionaries { + cmp.push(Compressor::with_dictionary(0, dict)?); + } + compressors = Some(cmp) + } + Ok(compressors) + } + } + } + + /// Compresses a value using a dictionary. + pub fn compress_with_dictionary( + column_value: &[u8], + tmp_buf: &mut Vec, + handle: &mut File, + compressor: Option<&mut Compressor>, + ) -> Result<(), NippyJarError> { + if let Some(compressor) = compressor { + // Compressor requires the destination buffer to be big enough to write, otherwise it + // fails. However, we don't know how big it will be. If data is small + // enough, the compressed buffer will actually be larger. We keep retrying. + // If we eventually fail, it probably means it's another kind of error. + let mut multiplier = 1; + while let Err(err) = compressor.compress_to_buffer(column_value, tmp_buf) { + tmp_buf.reserve(column_value.len() * multiplier); + multiplier += 1; + if multiplier == 5 { + return Err(NippyJarError::Disconnect(err)) + } + } + + handle.write_all(tmp_buf)?; + tmp_buf.clear(); + } else { + handle.write_all(column_value)?; + } + + Ok(()) + } + + /// Decompresses a value using a dictionary to a user provided buffer. + pub fn decompress_with_dictionary( + column_value: &[u8], + output: &mut Vec, + decompressor: &mut Decompressor<'_>, + ) -> Result<(), NippyJarError> { + let mut multiplier = 1; + + // Just an estimation. + let required_capacity = column_value.len() * 2; + + output.reserve(required_capacity.saturating_sub(output.capacity())); + + // Decompressor requires the destination buffer to be big enough to write to, otherwise it + // fails. However, we don't know how big it will be. We keep retrying. + // If we eventually fail, it probably means it's another kind of error. + while let Err(err) = decompressor.decompress_to_buffer(column_value, output) { + output.reserve( + Decompressor::upper_bound(column_value).unwrap_or(required_capacity) * multiplier, + ); + + multiplier += 1; + if multiplier == 5 { + return Err(NippyJarError::Disconnect(err)) + } + } + + Ok(()) + } +} + +impl Compression for Zstd { + fn decompress(&self, value: &[u8]) -> Result, NippyJarError> { + let mut decompressed = Vec::with_capacity(value.len() * 2); + let mut decoder = zstd::Decoder::new(value)?; + decoder.read_to_end(&mut decompressed)?; + Ok(decompressed) + } + + fn compress_to(&self, src: &[u8], dest: &mut W) -> Result<(), NippyJarError> { + let mut encoder = zstd::Encoder::new(dest, self.level)?; + encoder.write_all(src)?; + + encoder.finish()?; + + Ok(()) + } + + fn compress(&self, src: &[u8]) -> Result, NippyJarError> { + let mut compressed = Vec::with_capacity(src.len()); + + self.compress_to(src, &mut compressed)?; + + Ok(compressed) + } + + fn is_ready(&self) -> bool { + matches!(self.state, ZstdState::Ready) + } + + /// If using it with dictionaries, prepares a dictionary for each column. + fn prepare_compression( + &mut self, + columns: Vec>>, + ) -> Result<(), NippyJarError> { + if !self.use_dict { + return Ok(()) + } + + // There's a per 2GB hard limit on each column data set for training + // REFERENCE: https://github.com/facebook/zstd/blob/dev/programs/zstd.1.md#dictionary-builder + // ``` + // -M#, --memory=#: Limit the amount of sample data loaded for training (default: 2 GB). + // Note that the default (2 GB) is also the maximum. This parameter can be useful in + // situations where the training set size is not well controlled and could be potentially + // very large. Since speed of the training process is directly correlated to the size of the + // training sample set, a smaller sample set leads to faster training.` + // ``` + + if columns.len() != self.columns { + return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len())) + } + + // TODO: parallel calculation + let mut dictionaries = vec![]; + for column in columns { + // ZSTD requires all training data to be continuous in memory, alongside the size of + // each entry + let mut sizes = vec![]; + let data: Vec<_> = column + .into_iter() + .flat_map(|data| { + sizes.push(data.len()); + data + }) + .collect(); + + dictionaries.push(zstd::dict::from_continuous(&data, &sizes, self.max_dict_size)?); + } + + debug_assert_eq!(dictionaries.len(), self.columns); + + self.raw_dictionaries = Some(dictionaries); + self.state = ZstdState::Ready; + + Ok(()) + } +} diff --git a/crates/storage/nippy-jar/src/cursor.rs b/crates/storage/nippy-jar/src/cursor.rs new file mode 100644 index 000000000..1ea0fea10 --- /dev/null +++ b/crates/storage/nippy-jar/src/cursor.rs @@ -0,0 +1,223 @@ +use crate::{ + compression::{Compression, Zstd}, + InclusionFilter, NippyJar, NippyJarError, PerfectHashingFunction, Row, +}; +use memmap2::Mmap; +use serde::{de::Deserialize, ser::Serialize}; +use std::{clone::Clone, fs::File}; +use sucds::int_vectors::Access; +use zstd::bulk::Decompressor; + +/// Simple cursor implementation to retrieve data from [`NippyJar`]. +pub struct NippyJarCursor<'a, H> { + /// [`NippyJar`] which holds most of the required configuration to read from the file. + jar: &'a NippyJar, + /// Optional dictionary decompressors. + zstd_decompressors: Option>>, + /// Data file. + #[allow(unused)] + file_handle: File, + /// Data file. + mmap_handle: Mmap, + /// Temporary buffer to unload data to (if necessary), without reallocating memory on each + /// retrieval. + tmp_buf: Vec, + /// Cursor row position. + row: u64, +} + +impl<'a, H> std::fmt::Debug for NippyJarCursor<'a, H> +where + H: Send + Sync + Serialize + for<'b> Deserialize<'b> + core::fmt::Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NippyJarCursor").field("config", &self.jar).finish_non_exhaustive() + } +} + +impl<'a, H> NippyJarCursor<'a, H> +where + H: Send + Sync + Serialize + for<'b> Deserialize<'b>, +{ + pub fn new( + jar: &'a NippyJar, + zstd_decompressors: Option>>, + ) -> Result { + let file = File::open(jar.data_path())?; + + // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle. + let mmap = unsafe { Mmap::map(&file)? }; + + Ok(NippyJarCursor { + jar, + zstd_decompressors, + file_handle: file, + mmap_handle: mmap, + tmp_buf: vec![], + row: 0, + }) + } + + /// Resets cursor to the beginning. + pub fn reset(&mut self) { + self.row = 0; + } + + /// Returns a row, searching it by a key used during [`NippyJar::prepare_index`]. + /// + /// **May return false positives.** + /// + /// Example usage would be querying a transactions file with a transaction hash which is **NOT** + /// stored in file. + pub fn row_by_key(&mut self, key: &[u8]) -> Result, NippyJarError> { + if let (Some(filter), Some(phf)) = (&self.jar.filter, &self.jar.phf) { + // TODO: is it worth to parallize both? + + // May have false positives + if filter.contains(key)? { + // May have false positives + if let Some(row_index) = phf.get_index(key)? { + self.row = self + .jar + .offsets_index + .access(row_index as usize) + .expect("built from same set") as u64; + return self.next_row() + } + } + } else { + return Err(NippyJarError::UnsupportedFilterQuery) + } + + Ok(None) + } + + /// Returns a row by its number. + pub fn row_by_number(&mut self, row: usize) -> Result, NippyJarError> { + self.row = row as u64; + self.next_row() + } + + /// Returns the current value and advances the row. + pub fn next_row(&mut self) -> Result, NippyJarError> { + if self.row as usize * self.jar.columns >= self.jar.offsets.len() { + // Has reached the end + return Ok(None) + } + + let mut row = Vec::with_capacity(self.jar.columns); + + // Retrieve all column values from the row + for column in 0..self.jar.columns { + self.read_value(column, &mut row)?; + } + + self.row += 1; + + Ok(Some(row)) + } + + /// Returns a row, searching it by a key used during [`NippyJar::prepare_index`] by using a + /// `MASK` to only read certain columns from the row. + /// + /// **May return false positives.** + /// + /// Example usage would be querying a transactions file with a transaction hash which is **NOT** + /// stored in file. + pub fn row_by_key_with_cols( + &mut self, + key: &[u8], + ) -> Result, NippyJarError> { + if let (Some(filter), Some(phf)) = (&self.jar.filter, &self.jar.phf) { + // TODO: is it worth to parallize both? + + // May have false positives + if filter.contains(key)? { + // May have false positives + if let Some(row_index) = phf.get_index(key)? { + self.row = self + .jar + .offsets_index + .access(row_index as usize) + .expect("built from same set") as u64; + return self.next_row_with_cols::() + } + } + } else { + return Err(NippyJarError::UnsupportedFilterQuery) + } + + Ok(None) + } + + /// Returns a row by its number by using a `MASK` to only read certain columns from the row. + pub fn row_by_number_with_cols( + &mut self, + row: usize, + ) -> Result, NippyJarError> { + self.row = row as u64; + self.next_row_with_cols::() + } + + /// Returns the current value and advances the row. + /// + /// Uses a `MASK` to only read certain columns from the row. + pub fn next_row_with_cols( + &mut self, + ) -> Result, NippyJarError> { + debug_assert!(COLUMNS == self.jar.columns); + + if self.row as usize * self.jar.columns >= self.jar.offsets.len() { + // Has reached the end + return Ok(None) + } + + let mut row = Vec::with_capacity(COLUMNS); + + for column in 0..COLUMNS { + if MASK & (1 << column) != 0 { + self.read_value(column, &mut row)? + } + } + + self.row += 1; + + Ok(Some(row)) + } + + /// Takes the column index and reads the value for the corresponding column. + fn read_value(&mut self, column: usize, row: &mut Row) -> Result<(), NippyJarError> { + // Find out the offset of the column value + let offset_pos = self.row as usize * self.jar.columns + column; + let value_offset = self.jar.offsets.select(offset_pos).expect("should exist"); + + let column_value = if self.jar.offsets.len() == (offset_pos + 1) { + // It's the last column of the last row + &self.mmap_handle[value_offset..] + } else { + let next_value_offset = self.jar.offsets.select(offset_pos + 1).expect("should exist"); + &self.mmap_handle[value_offset..next_value_offset] + }; + + if let Some(zstd_dict_decompressors) = self.zstd_decompressors.as_mut() { + self.tmp_buf.clear(); + + if let Some(decompressor) = zstd_dict_decompressors.get_mut(column) { + Zstd::decompress_with_dictionary(column_value, &mut self.tmp_buf, decompressor)?; + } + + debug_assert!(!self.tmp_buf.is_empty()); + + row.push(self.tmp_buf.clone()); + } else if let Some(compression) = &self.jar.compressor { + // Uses the chosen default decompressor + row.push(compression.decompress(column_value)?); + } else { + // Not compressed + // TODO: return Cow<&> instead of copying if there's no compression + row.push(column_value.to_vec()) + } + + Ok(()) + } +} diff --git a/crates/storage/nippy-jar/src/error.rs b/crates/storage/nippy-jar/src/error.rs new file mode 100644 index 000000000..86353d0a7 --- /dev/null +++ b/crates/storage/nippy-jar/src/error.rs @@ -0,0 +1,34 @@ +use thiserror::Error; + +/// Errors associated with [`crate::NippyJar`]. +#[derive(Debug, Error)] +pub enum NippyJarError { + #[error(transparent)] + Disconnect(#[from] std::io::Error), + #[error(transparent)] + Bincode(#[from] Box), + #[error(transparent)] + EliasFano(#[from] anyhow::Error), + #[error("Compression was enabled, but it's not ready yet.")] + CompressorNotReady, + #[error("Decompression was enabled, but it's not ready yet.")] + DecompressorNotReady, + #[error("Number of columns does not match. {0} != {1}")] + ColumnLenMismatch(usize, usize), + #[error("UnexpectedMissingValue row: {0} col:{1}")] + UnexpectedMissingValue(u64, u64), + #[error(transparent)] + FilterError(#[from] cuckoofilter::CuckooError), + #[error("NippyJar initialized without filter.")] + FilterMissing, + #[error("Filter has reached max capacity.")] + FilterMaxCapacity, + #[error("Cuckoo was not properly initialized after loaded.")] + FilterCuckooNotLoaded, + #[error("Perfect hashing function doesn't have any keys added.")] + PHFMissingKeys, + #[error("NippyJar initialized without perfect hashing function.")] + PHFMissing, + #[error("NippyJar was built without an index.")] + UnsupportedFilterQuery, +} diff --git a/crates/storage/nippy-jar/src/filter/cuckoo.rs b/crates/storage/nippy-jar/src/filter/cuckoo.rs new file mode 100644 index 000000000..139af65e4 --- /dev/null +++ b/crates/storage/nippy-jar/src/filter/cuckoo.rs @@ -0,0 +1,80 @@ +use super::InclusionFilter; +use crate::NippyJarError; +use cuckoofilter::{self, CuckooFilter, ExportedCuckooFilter}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::collections::hash_map::DefaultHasher; + +/// [CuckooFilter](https://www.cs.cmu.edu/~dga/papers/cuckoo-conext2014.pdf). It builds and provides an approximated set-membership filter to answer queries such as "Does this element belong to this set?". Has a theoretical 3% false positive rate. +pub struct Cuckoo { + /// Remaining number of elements that can be added. + /// + /// This is necessary because the inner implementation will fail on adding an element past capacity, **but it will still add it and remove other**: [source](https://github.com/axiomhq/rust-cuckoofilter/tree/624da891bed1dd5d002c8fa92ce0dcd301975561#notes--todos) + remaining: usize, + + /// CuckooFilter. + filter: CuckooFilter, // TODO does it need an actual hasher? +} + +impl Cuckoo { + pub fn new(max_capacity: usize) -> Self { + Cuckoo { remaining: max_capacity, filter: CuckooFilter::with_capacity(max_capacity) } + } +} + +impl InclusionFilter for Cuckoo { + fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> { + if self.remaining == 0 { + return Err(NippyJarError::FilterMaxCapacity) + } + + self.remaining -= 1; + + Ok(self.filter.add(element)?) + } + + fn contains(&self, element: &[u8]) -> Result { + Ok(self.filter.contains(element)) + } +} + +impl std::fmt::Debug for Cuckoo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Cuckoo") + .field("remaining", &self.remaining) + .field("filter_size", &self.filter.memory_usage()) + .finish_non_exhaustive() + } +} + +#[cfg(test)] +impl PartialEq for Cuckoo { + fn eq(&self, _other: &Self) -> bool { + self.remaining == _other.remaining && { + let f1 = self.filter.export(); + let f2 = _other.filter.export(); + f1.length == f2.length && f1.values == f2.values + } + } +} + +impl<'de> Deserialize<'de> for Cuckoo { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let (remaining, exported): (usize, ExportedCuckooFilter) = + Deserialize::deserialize(deserializer)?; + + Ok(Cuckoo { remaining, filter: exported.into() }) + } +} + +impl Serialize for Cuckoo { + /// Potentially expensive, but should be used only when creating the file. + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + (self.remaining, self.filter.export()).serialize(serializer) + } +} diff --git a/crates/storage/nippy-jar/src/filter/mod.rs b/crates/storage/nippy-jar/src/filter/mod.rs new file mode 100644 index 000000000..e8e6294eb --- /dev/null +++ b/crates/storage/nippy-jar/src/filter/mod.rs @@ -0,0 +1,39 @@ +use crate::NippyJarError; +use serde::{Deserialize, Serialize}; + +mod cuckoo; +pub use cuckoo::Cuckoo; + +/// Membership filter set trait. +pub trait InclusionFilter { + /// Add element to the inclusion list. + fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError>; + + /// Checks if the element belongs to the inclusion list. **There might be false positives.** + fn contains(&self, element: &[u8]) -> Result; +} + +/// Enum with different [`InclusionFilter`] types. +#[derive(Debug, Serialize, Deserialize)] +#[cfg_attr(test, derive(PartialEq))] +pub enum InclusionFilters { + Cuckoo(Cuckoo), + // Avoids irrefutable let errors. Remove this after adding another one. + Unused, +} + +impl InclusionFilter for InclusionFilters { + fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> { + match self { + InclusionFilters::Cuckoo(c) => c.add(element), + InclusionFilters::Unused => todo!(), + } + } + + fn contains(&self, element: &[u8]) -> Result { + match self { + InclusionFilters::Cuckoo(c) => c.contains(element), + InclusionFilters::Unused => todo!(), + } + } +} diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs new file mode 100644 index 000000000..bd491871a --- /dev/null +++ b/crates/storage/nippy-jar/src/lib.rs @@ -0,0 +1,809 @@ +use serde::{Deserialize, Serialize}; +use std::{ + clone::Clone, + fs::File, + hash::Hash, + io::{Seek, Write}, + marker::Sync, + path::{Path, PathBuf}, +}; +use sucds::{ + int_vectors::PrefixSummedEliasFano, + mii_sequences::{EliasFano, EliasFanoBuilder}, + Serializable, +}; + +pub mod filter; +use filter::{Cuckoo, InclusionFilter, InclusionFilters}; + +pub mod compression; +use compression::{Compression, Compressors}; + +pub mod phf; +use phf::{Fmph, Functions, GoFmph, PerfectHashingFunction}; + +mod error; +pub use error::NippyJarError; + +mod cursor; +pub use cursor::NippyJarCursor; + +const NIPPY_JAR_VERSION: usize = 1; + +/// A [`Row`] is a list of its selected column values. +type Row = Vec>; + +/// `NippyJar` is a specialized storage format designed for immutable data. +/// +/// Data is organized into a columnar format, enabling column-based compression. Data retrieval +/// entails consulting an offset list and fetching the data from file via `mmap`. +/// +/// PHF & Filters: +/// For data membership verification, the `filter` field can be configured with algorithms like +/// Bloom or Cuckoo filters. While these filters enable rapid membership checks, it's important to +/// note that **they may yield false positives but not false negatives**. Therefore, they serve as +/// preliminary checks (eg. in `by_hash` queries) and should be followed by data verification on +/// retrieval. +/// +/// The `phf` (Perfect Hashing Function) and `offsets_index` fields facilitate the data retrieval +/// process in for example `by_hash` queries. Specifically, the PHF converts a query, such as a +/// block hash, into a unique integer. This integer is then used as an index in `offsets_index`, +/// which maps to the actual data location in the `offsets` list. Similar to the `filter`, the PHF +/// may also produce false positives but not false negatives, necessitating subsequent data +/// verification. +/// +/// Note: that the key (eg. BlockHash) passed to a filter and phf does not need to actually be +/// stored. +/// +/// Ultimately, the `freeze` function yields two files: a data file containing both the data and its +/// configuration, and an index file that houses the offsets and offsets_index. +#[derive(Debug, Serialize, Deserialize)] +#[cfg_attr(test, derive(PartialEq))] +pub struct NippyJar { + /// The version of the NippyJar format. + version: usize, + /// User-defined header data. + /// Default: zero-sized unit type: no header data + user_header: H, + /// Number of data columns in the jar. + columns: usize, + /// Optional compression algorithm applied to the data. + compressor: Option, + /// Optional filter function for data membership checks. + filter: Option, + /// Optional Perfect Hashing Function (PHF) for unique offset mapping. + phf: Option, + /// Index mapping PHF output to value offsets in `offsets`. + #[serde(skip)] + offsets_index: PrefixSummedEliasFano, + /// Offsets within the file for each column value, arranged by row and column. + #[serde(skip)] + offsets: EliasFano, + /// Data path for file. Index file will be `{path}.idx` + #[serde(skip)] + path: Option, +} + +impl NippyJar<()> { + /// Creates a new [`NippyJar`] without an user-defined header data. + pub fn new_without_header(columns: usize, path: &Path) -> Self { + NippyJar::<()>::new(columns, path, ()) + } + + /// Loads the file configuration and returns [`Self`] on a jar without user-defined header data. + pub fn load_without_header(path: &Path) -> Result { + NippyJar::<()>::load(path) + } +} + +impl NippyJar +where + H: Send + Sync + Serialize + for<'a> Deserialize<'a>, +{ + /// Creates a new [`NippyJar`] with a user-defined header data. + pub fn new(columns: usize, path: &Path, user_header: H) -> Self { + NippyJar { + version: NIPPY_JAR_VERSION, + user_header, + columns, + compressor: None, + filter: None, + phf: None, + offsets: EliasFano::default(), + offsets_index: PrefixSummedEliasFano::default(), + path: Some(path.to_path_buf()), + } + } + + /// Adds [`compression::Zstd`] compression. + pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self { + self.compressor = + Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns))); + self + } + + /// Adds [`filter::Cuckoo`] filter. + pub fn with_cuckoo_filter(mut self, max_capacity: usize) -> Self { + self.filter = Some(InclusionFilters::Cuckoo(Cuckoo::new(max_capacity))); + self + } + + /// Adds [`phf::Fmph`] perfect hashing function. + pub fn with_mphf(mut self) -> Self { + self.phf = Some(Functions::Fmph(Fmph::new())); + self + } + + /// Adds [`phf::GoFmph`] perfect hashing function. + pub fn with_gomphf(mut self) -> Self { + self.phf = Some(Functions::GoFmph(GoFmph::new())); + self + } + + /// Gets a reference to the user header. + pub fn user_header(&self) -> &H { + &self.user_header + } + + /// Loads the file configuration and returns [`Self`]. + /// + /// **The user must ensure the header type matches the one used during the jar's creation.** + pub fn load(path: &Path) -> Result { + // Read [`Self`] located at the data file. + let data_file = File::open(path)?; + + // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle. + let data_reader = unsafe { memmap2::Mmap::map(&data_file)? }; + let mut obj: Self = bincode::deserialize_from(data_reader.as_ref())?; + obj.path = Some(path.to_path_buf()); + + // Read the offsets lists located at the index file. + let offsets_file = File::open(obj.index_path())?; + + // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle. + let mmap = unsafe { memmap2::Mmap::map(&offsets_file)? }; + let mut offsets_reader = mmap.as_ref(); + obj.offsets = EliasFano::deserialize_from(&mut offsets_reader)?; + obj.offsets_index = PrefixSummedEliasFano::deserialize_from(offsets_reader)?; + + Ok(obj) + } + + /// Returns the path from the data file + pub fn data_path(&self) -> PathBuf { + self.path.clone().expect("exists") + } + + /// Returns the path from the index file + pub fn index_path(&self) -> PathBuf { + let data_path = self.data_path(); + data_path + .parent() + .expect("exists") + .join(format!("{}.idx", data_path.file_name().expect("exists").to_string_lossy())) + } + + /// If required, prepares any compression algorithm to an early pass of the data. + pub fn prepare_compression( + &mut self, + columns: Vec>>, + ) -> Result<(), NippyJarError> { + // Makes any necessary preparations for the compressors + if let Some(compression) = &mut self.compressor { + compression.prepare_compression(columns)?; + } + Ok(()) + } + + /// Prepares beforehand the offsets index for querying rows based on `values` (eg. transaction + /// hash). Expects `values` to be sorted in the same way as the data that is going to be + /// later on inserted. + pub fn prepare_index + Sync + Clone + Hash>( + &mut self, + values: &[T], + ) -> Result<(), NippyJarError> { + let mut offsets_index = vec![0; values.len()]; + + // Builds perfect hashing function from the values + if let Some(phf) = self.phf.as_mut() { + phf.set_keys(values)?; + } + + if self.filter.is_some() || self.phf.is_some() { + for (row_num, v) in values.iter().enumerate() { + if let Some(filter) = self.filter.as_mut() { + filter.add(v.as_ref())?; + } + + if let Some(phf) = self.phf.as_mut() { + // Points to the first column value offset of the row. + let index = phf.get_index(v.as_ref())?.expect("initialized") as usize; + let _ = std::mem::replace(&mut offsets_index[index], row_num as u64); + } + } + } + + self.offsets_index = PrefixSummedEliasFano::from_slice(&offsets_index)?; + Ok(()) + } + + /// Writes all data and configuration to a file and the offset index to another. + pub fn freeze( + &mut self, + columns: Vec>>, + total_rows: u64, + ) -> Result<(), NippyJarError> { + let mut file = self.freeze_check(&columns)?; + self.freeze_config(&mut file)?; + + // Special case for zstd that might use custom dictionaries/compressors per column + // If any other compression algorithm is added and uses a similar flow, then revisit + // implementation + let mut maybe_zstd_compressors = None; + if let Some(Compressors::Zstd(zstd)) = &self.compressor { + maybe_zstd_compressors = zstd.generate_compressors()?; + } + + // Temporary buffer to avoid multiple reallocations if compressing to a buffer (eg. zstd w/ + // dict) + let mut tmp_buf = Vec::with_capacity(100); + + // Write all rows while taking all row start offsets + let mut row_number = 0u64; + let mut offsets = Vec::with_capacity(total_rows as usize * self.columns); + let mut column_iterators = + columns.into_iter().map(|v| v.into_iter()).collect::>().into_iter(); + + loop { + let mut iterators = Vec::with_capacity(self.columns); + + // Write the column value of each row + // TODO: iter_mut if we remove the IntoIterator interface. + for (column_number, mut column_iter) in column_iterators.enumerate() { + offsets.push(file.stream_position()? as usize); + + match column_iter.next() { + Some(value) => { + if let Some(compression) = &self.compressor { + // Special zstd case with dictionaries + if let (Some(dict_compressors), Compressors::Zstd(_)) = + (maybe_zstd_compressors.as_mut(), compression) + { + compression::Zstd::compress_with_dictionary( + &value, + &mut tmp_buf, + &mut file, + Some(dict_compressors.get_mut(column_number).expect("exists")), + )?; + } else { + compression.compress_to(&value, &mut file)?; + } + } else { + file.write_all(&value)?; + } + } + None => { + return Err(NippyJarError::UnexpectedMissingValue( + row_number, + column_number as u64, + )) + } + } + + iterators.push(column_iter); + } + + row_number += 1; + if row_number == total_rows { + break + } + + column_iterators = iterators.into_iter(); + } + + // Write offsets and offset index to file + self.freeze_offsets(offsets)?; + + Ok(()) + } + + /// Freezes offsets and its own index. + fn freeze_offsets(&mut self, offsets: Vec) -> Result<(), NippyJarError> { + if !offsets.is_empty() { + let mut builder = + EliasFanoBuilder::new(*offsets.last().expect("qed") + 1, offsets.len())?; + + for offset in offsets { + builder.push(offset)?; + } + self.offsets = builder.build().enable_rank(); + } + let mut file = File::create(self.index_path())?; + self.offsets.serialize_into(&mut file)?; + self.offsets_index.serialize_into(file)?; + Ok(()) + } + + /// Safety checks before creating and returning a [`File`] handle to write data to. + fn freeze_check( + &mut self, + columns: &Vec>>, + ) -> Result { + if columns.len() != self.columns { + return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len())) + } + + if let Some(compression) = &self.compressor { + if !compression.is_ready() { + return Err(NippyJarError::CompressorNotReady) + } + } + + // Check `prepare_index` was called. + if let Some(phf) = &self.phf { + let _ = phf.get_index(&[])?; + } + + Ok(File::create(self.data_path())?) + } + + /// Writes all necessary configuration to file. + fn freeze_config(&mut self, handle: &mut File) -> Result<(), NippyJarError> { + // TODO Split Dictionaries and Bloomfilters Configuration so we dont have to load everything + // at once + Ok(bincode::serialize_into(handle, &self)?) + } +} + +impl InclusionFilter for NippyJar +where + H: Send + Sync + Serialize + for<'a> Deserialize<'a>, +{ + fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> { + self.filter.as_mut().ok_or(NippyJarError::FilterMissing)?.add(element) + } + + fn contains(&self, element: &[u8]) -> Result { + self.filter.as_ref().ok_or(NippyJarError::FilterMissing)?.contains(element) + } +} + +impl PerfectHashingFunction for NippyJar +where + H: Send + Sync + Serialize + for<'a> Deserialize<'a>, +{ + fn set_keys + Sync + Clone + Hash>( + &mut self, + keys: &[T], + ) -> Result<(), NippyJarError> { + self.phf.as_mut().ok_or(NippyJarError::PHFMissing)?.set_keys(keys) + } + + fn get_index(&self, key: &[u8]) -> Result, NippyJarError> { + self.phf.as_ref().ok_or(NippyJarError::PHFMissing)?.get_index(key) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng}; + use std::collections::HashSet; + + type ColumnValues = Vec>; + + fn test_data(seed: Option) -> (ColumnValues, ColumnValues) { + let value_length = 32; + let num_rows = 100; + + let mut vec: Vec = vec![0; value_length]; + let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_entropy); + + let mut gen = || { + (0..num_rows) + .map(|_| { + rng.fill_bytes(&mut vec[..]); + vec.clone() + }) + .collect() + }; + + (gen(), gen()) + } + + #[test] + fn test_phf() { + let (col1, col2) = test_data(None); + let num_columns = 2; + let num_rows = col1.len() as u64; + let file_path = tempfile::NamedTempFile::new().unwrap(); + + let mut nippy = NippyJar::new_without_header(num_columns, file_path.path()); + assert!(matches!(NippyJar::set_keys(&mut nippy, &col1), Err(NippyJarError::PHFMissing))); + + let check_phf = |nippy: &mut NippyJar<_>| { + assert!(matches!( + NippyJar::get_index(nippy, &col1[0]), + Err(NippyJarError::PHFMissingKeys) + )); + assert!(NippyJar::set_keys(nippy, &col1).is_ok()); + + let collect_indexes = |nippy: &NippyJar<_>| -> Vec { + col1.iter() + .map(|value| NippyJar::get_index(nippy, value.as_slice()).unwrap().unwrap()) + .collect() + }; + + // Ensure all indexes are unique + let indexes = collect_indexes(nippy); + assert_eq!(indexes.iter().collect::>().len(), indexes.len()); + + // Ensure reproducibility + assert!(NippyJar::set_keys(nippy, &col1).is_ok()); + assert_eq!(indexes, collect_indexes(nippy)); + + // Ensure that loaded phf provides the same function outputs + nippy.prepare_index(&col1).unwrap(); + nippy.freeze(vec![col1.clone(), col2.clone()], num_rows).unwrap(); + let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + assert_eq!(indexes, collect_indexes(&loaded_nippy)); + }; + + // mphf bytes size for 100 values of 32 bytes: 54 + nippy = nippy.with_mphf(); + check_phf(&mut nippy); + + // mphf bytes size for 100 values of 32 bytes: 46 + nippy = nippy.with_gomphf(); + check_phf(&mut nippy); + } + + #[test] + fn test_filter() { + let (col1, col2) = test_data(Some(1)); + let num_columns = 2; + let num_rows = col1.len() as u64; + let file_path = tempfile::NamedTempFile::new().unwrap(); + + let mut nippy = NippyJar::new_without_header(num_columns, file_path.path()); + + assert!(matches!( + InclusionFilter::add(&mut nippy, &col1[0]), + Err(NippyJarError::FilterMissing) + )); + + nippy = nippy.with_cuckoo_filter(4); + + // Add col1[0] + assert!(!InclusionFilter::contains(&nippy, &col1[0]).unwrap()); + assert!(InclusionFilter::add(&mut nippy, &col1[0]).is_ok()); + assert!(InclusionFilter::contains(&nippy, &col1[0]).unwrap()); + + // Add col1[1] + assert!(!InclusionFilter::contains(&nippy, &col1[1]).unwrap()); + assert!(InclusionFilter::add(&mut nippy, &col1[1]).is_ok()); + assert!(InclusionFilter::contains(&nippy, &col1[1]).unwrap()); + + // // Add more columns until max_capacity + assert!(InclusionFilter::add(&mut nippy, &col1[2]).is_ok()); + assert!(InclusionFilter::add(&mut nippy, &col1[3]).is_ok()); + assert!(matches!( + InclusionFilter::add(&mut nippy, &col1[4]), + Err(NippyJarError::FilterMaxCapacity) + )); + + nippy.freeze(vec![col1.clone(), col2.clone()], num_rows).unwrap(); + let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + + assert_eq!(nippy, loaded_nippy); + + assert!(InclusionFilter::contains(&loaded_nippy, &col1[0]).unwrap()); + assert!(InclusionFilter::contains(&loaded_nippy, &col1[1]).unwrap()); + assert!(InclusionFilter::contains(&loaded_nippy, &col1[2]).unwrap()); + assert!(InclusionFilter::contains(&loaded_nippy, &col1[3]).unwrap()); + assert!(!InclusionFilter::contains(&loaded_nippy, &col1[4]).unwrap()); + } + + #[test] + fn test_zstd_with_dictionaries() { + let (col1, col2) = test_data(None); + let num_rows = col1.len() as u64; + let num_columns = 2; + let file_path = tempfile::NamedTempFile::new().unwrap(); + + let nippy = NippyJar::new_without_header(num_columns, file_path.path()); + assert!(nippy.compressor.is_none()); + + let mut nippy = + NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000); + assert!(nippy.compressor.is_some()); + + if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor { + assert!(matches!(zstd.generate_compressors(), Err(NippyJarError::CompressorNotReady))); + + // Make sure the number of column iterators match the initial set up ones. + assert!(matches!( + zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]), + Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns + )); + } + + let data = vec![col1.clone(), col2.clone()]; + + // If ZSTD is enabled, do not write to the file unless the column dictionaries have been + // calculated. + assert!(matches!( + nippy.freeze(data.clone(), num_rows), + Err(NippyJarError::CompressorNotReady) + )); + + nippy.prepare_compression(data.clone()).unwrap(); + + if let Some(Compressors::Zstd(zstd)) = &nippy.compressor { + assert!(matches!( + (&zstd.state, zstd.raw_dictionaries.as_ref().map(|dict| dict.len())), + (compression::ZstdState::Ready, Some(columns)) if columns == num_columns + )); + } + + nippy.freeze(data.clone(), num_rows).unwrap(); + + let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + assert_eq!(nippy, loaded_nippy); + + let mut dicts = vec![]; + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_mut() { + dicts = zstd.generate_decompress_dictionaries().unwrap() + } + + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() { + let mut cursor = NippyJarCursor::new( + &loaded_nippy, + Some(zstd.generate_decompressors(&dicts).unwrap()), + ) + .unwrap(); + + // Iterate over compressed values and compare + let mut row_index = 0usize; + while let Some(row) = cursor.next_row().unwrap() { + assert_eq!((&row[0], &row[1]), (&data[0][row_index], &data[1][row_index])); + row_index += 1; + } + } + } + + #[test] + fn test_zstd_no_dictionaries() { + let (col1, col2) = test_data(None); + let num_rows = col1.len() as u64; + let num_columns = 2; + let file_path = tempfile::NamedTempFile::new().unwrap(); + + let nippy = NippyJar::new_without_header(num_columns, file_path.path()); + assert!(nippy.compressor.is_none()); + + let mut nippy = + NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000); + assert!(nippy.compressor.is_some()); + + let data = vec![col1.clone(), col2.clone()]; + + nippy.freeze(data.clone(), num_rows).unwrap(); + + let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + assert_eq!(nippy, loaded_nippy); + + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() { + assert!(!zstd.use_dict); + + let mut cursor = NippyJarCursor::new(&loaded_nippy, None).unwrap(); + + // Iterate over compressed values and compare + let mut row_index = 0usize; + while let Some(row) = cursor.next_row().unwrap() { + assert_eq!((&row[0], &row[1]), (&data[0][row_index], &data[1][row_index])); + row_index += 1; + } + } else { + panic!("Expected Zstd compressor") + } + } + + /// Tests NippyJar with everything enabled: compression, filter, offset list and offset index. + #[test] + fn test_full_nippy_jar() { + let (col1, col2) = test_data(None); + let num_rows = col1.len() as u64; + let num_columns = 2; + let file_path = tempfile::NamedTempFile::new().unwrap(); + let data = vec![col1.clone(), col2.clone()]; + let block_start = 500; + + #[derive(Serialize, Deserialize, Debug)] + pub struct BlockJarHeader { + block_start: usize, + } + + // Create file + { + let mut nippy = + NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start }) + .with_zstd(true, 5000) + .with_cuckoo_filter(col1.len()) + .with_mphf(); + + nippy.prepare_compression(data.clone()).unwrap(); + nippy.prepare_index(&col1).unwrap(); + nippy.freeze(data.clone(), num_rows).unwrap(); + } + + // Read file + { + let mut loaded_nippy = NippyJar::::load(file_path.path()).unwrap(); + + assert!(loaded_nippy.compressor.is_some()); + assert!(loaded_nippy.filter.is_some()); + assert!(loaded_nippy.phf.is_some()); + assert_eq!(loaded_nippy.user_header().block_start, block_start); + + let mut dicts = vec![]; + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_mut() { + dicts = zstd.generate_decompress_dictionaries().unwrap() + } + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() { + let mut cursor = NippyJarCursor::new( + &loaded_nippy, + Some(zstd.generate_decompressors(&dicts).unwrap()), + ) + .unwrap(); + + // Iterate over compressed values and compare + let mut row_num = 0usize; + while let Some(row) = cursor.next_row().unwrap() { + assert_eq!((&row[0], &row[1]), (&data[0][row_num], &data[1][row_num])); + row_num += 1; + } + + // Shuffled for chaos. + let mut data = col1.iter().zip(col2.iter()).enumerate().collect::>(); + data.shuffle(&mut rand::thread_rng()); + + for (row_num, (v0, v1)) in data { + // Simulates `by_hash` queries by iterating col1 values, which were used to + // create the inner index. + let row_by_value = cursor.row_by_key(v0).unwrap().unwrap(); + assert_eq!((&row_by_value[0], &row_by_value[1]), (v0, v1)); + + // Simulates `by_number` queries + let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap(); + assert_eq!(row_by_value, row_by_num); + } + } + } + } + + #[test] + fn test_selectable_column_values() { + let (col1, col2) = test_data(None); + let num_rows = col1.len() as u64; + let num_columns = 2; + let file_path = tempfile::NamedTempFile::new().unwrap(); + let data = vec![col1.clone(), col2.clone()]; + + // Create file + { + let mut nippy = NippyJar::new_without_header(num_columns, file_path.path()) + .with_zstd(true, 5000) + .with_cuckoo_filter(col1.len()) + .with_mphf(); + + nippy.prepare_compression(data.clone()).unwrap(); + nippy.prepare_index(&col1).unwrap(); + nippy.freeze(data.clone(), num_rows).unwrap(); + } + + // Read file + { + let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + + let mut dicts = vec![]; + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_mut() { + dicts = zstd.generate_decompress_dictionaries().unwrap() + } + if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() { + let mut cursor = NippyJarCursor::new( + &loaded_nippy, + Some(zstd.generate_decompressors(&dicts).unwrap()), + ) + .unwrap(); + + // Shuffled for chaos. + let mut data = col1.iter().zip(col2.iter()).enumerate().collect::>(); + data.shuffle(&mut rand::thread_rng()); + + // Imagine `Blocks` snapshot file has two columns: `Block | StoredWithdrawals` + const BLOCKS_FULL_MASK: usize = 0b11; + const BLOCKS_COLUMNS: usize = 2; + + // Read both columns + for (row_num, (v0, v1)) in &data { + // Simulates `by_hash` queries by iterating col1 values, which were used to + // create the inner index. + let row_by_value = cursor + .row_by_key_with_cols::(v0) + .unwrap() + .unwrap(); + assert_eq!((&row_by_value[0], &row_by_value[1]), (*v0, *v1)); + + // Simulates `by_number` queries + let row_by_num = cursor + .row_by_number_with_cols::(*row_num) + .unwrap() + .unwrap(); + assert_eq!(row_by_value, row_by_num); + } + + // Read first column only: `Block` + const BLOCKS_BLOCK_MASK: usize = 0b01; + for (row_num, (v0, _)) in &data { + // Simulates `by_hash` queries by iterating col1 values, which were used to + // create the inner index. + let row_by_value = cursor + .row_by_key_with_cols::(v0) + .unwrap() + .unwrap(); + assert_eq!(row_by_value.len(), 1); + assert_eq!(&row_by_value[0], *v0); + + // Simulates `by_number` queries + let row_by_num = cursor + .row_by_number_with_cols::(*row_num) + .unwrap() + .unwrap(); + assert_eq!(row_by_num.len(), 1); + assert_eq!(row_by_value, row_by_num); + } + + // Read second column only: `Block` + const BLOCKS_WITHDRAWAL_MASK: usize = 0b10; + for (row_num, (v0, v1)) in &data { + // Simulates `by_hash` queries by iterating col1 values, which were used to + // create the inner index. + let row_by_value = cursor + .row_by_key_with_cols::(v0) + .unwrap() + .unwrap(); + assert_eq!(row_by_value.len(), 1); + assert_eq!(&row_by_value[0], *v1); + + // Simulates `by_number` queries + let row_by_num = cursor + .row_by_number_with_cols::(*row_num) + .unwrap() + .unwrap(); + assert_eq!(row_by_num.len(), 1); + assert_eq!(row_by_value, row_by_num); + } + + // Read nothing + const BLOCKS_EMPTY_MASK: usize = 0b00; + for (row_num, (v0, _)) in &data { + // Simulates `by_hash` queries by iterating col1 values, which were used to + // create the inner index. + assert!(cursor + .row_by_key_with_cols::(v0) + .unwrap() + .unwrap() + .is_empty()); + + // Simulates `by_number` queries + assert!(cursor + .row_by_number_with_cols::(*row_num) + .unwrap() + .unwrap() + .is_empty()); + } + } + } + } +} diff --git a/crates/storage/nippy-jar/src/phf/fmph.rs b/crates/storage/nippy-jar/src/phf/fmph.rs new file mode 100644 index 000000000..62740e48a --- /dev/null +++ b/crates/storage/nippy-jar/src/phf/fmph.rs @@ -0,0 +1,104 @@ +use crate::{NippyJarError, PerfectHashingFunction}; +use ph::fmph::{BuildConf, Function}; +use serde::{ + de::Error as DeSerdeError, ser::Error as SerdeError, Deserialize, Deserializer, Serialize, + Serializer, +}; +use std::{clone::Clone, hash::Hash, marker::Sync}; + +/// Wrapper struct for [`Function`]. Implementation of the following [paper](https://dl.acm.org/doi/10.1145/3596453). +#[derive(Default)] +pub struct Fmph { + function: Option, +} + +impl Fmph { + pub fn new() -> Self { + Self { function: None } + } +} + +impl PerfectHashingFunction for Fmph { + fn set_keys + Sync + Clone + Hash>( + &mut self, + keys: &[T], + ) -> Result<(), NippyJarError> { + self.function = Some(Function::from_slice_with_conf( + keys, + BuildConf { use_multiple_threads: true, ..Default::default() }, + )); + Ok(()) + } + + fn get_index(&self, key: &[u8]) -> Result, NippyJarError> { + if let Some(f) = &self.function { + return Ok(f.get(key)) + } + Err(NippyJarError::PHFMissingKeys) + } +} + +#[cfg(test)] +impl PartialEq for Fmph { + fn eq(&self, _other: &Self) -> bool { + match (&self.function, &_other.function) { + (Some(func1), Some(func2)) => { + func1.level_sizes() == func2.level_sizes() && + func1.write_bytes() == func2.write_bytes() && + { + let mut f1 = Vec::with_capacity(func1.write_bytes()); + func1.write(&mut f1).expect("enough capacity"); + + let mut f2 = Vec::with_capacity(func2.write_bytes()); + func2.write(&mut f2).expect("enough capacity"); + + f1 == f2 + } + } + (None, None) => true, + _ => false, + } + } +} + +impl std::fmt::Debug for Fmph { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Fmph") + .field("level_sizes", &self.function.as_ref().map(|f| f.level_sizes())) + .field("bytes_size", &self.function.as_ref().map(|f| f.write_bytes())) + .finish_non_exhaustive() + } +} + +impl Serialize for Fmph { + /// Potentially expensive, but should be used only when creating the file. + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match &self.function { + Some(f) => { + let mut v = Vec::with_capacity(f.write_bytes()); + f.write(&mut v).map_err(S::Error::custom)?; + serializer.serialize_some(&v) + } + None => serializer.serialize_none(), + } + } +} + +impl<'de> Deserialize<'de> for Fmph { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + if let Some(buffer) = >>::deserialize(deserializer)? { + return Ok(Fmph { + function: Some( + Function::read(&mut std::io::Cursor::new(buffer)).map_err(D::Error::custom)?, + ), + }) + } + Ok(Fmph { function: None }) + } +} diff --git a/crates/storage/nippy-jar/src/phf/go_fmph.rs b/crates/storage/nippy-jar/src/phf/go_fmph.rs new file mode 100644 index 000000000..fc7b8fa89 --- /dev/null +++ b/crates/storage/nippy-jar/src/phf/go_fmph.rs @@ -0,0 +1,105 @@ +use crate::{NippyJarError, PerfectHashingFunction}; +use ph::fmph::{GOBuildConf, GOFunction}; +use serde::{ + de::Error as DeSerdeError, ser::Error as SerdeError, Deserialize, Deserializer, Serialize, + Serializer, +}; +use std::{clone::Clone, hash::Hash, marker::Sync}; + +/// Wrapper struct for [`GOFunction`]. Implementation of the following [paper](https://dl.acm.org/doi/10.1145/3596453). +#[derive(Default)] +pub struct GoFmph { + function: Option, +} + +impl GoFmph { + pub fn new() -> Self { + Self { function: None } + } +} + +impl PerfectHashingFunction for GoFmph { + fn set_keys + Sync + Clone + Hash>( + &mut self, + keys: &[T], + ) -> Result<(), NippyJarError> { + self.function = Some(GOFunction::from_slice_with_conf( + keys, + GOBuildConf { use_multiple_threads: true, ..Default::default() }, + )); + Ok(()) + } + + fn get_index(&self, key: &[u8]) -> Result, NippyJarError> { + if let Some(f) = &self.function { + return Ok(f.get(key)) + } + Err(NippyJarError::PHFMissingKeys) + } +} + +#[cfg(test)] +impl PartialEq for GoFmph { + fn eq(&self, other: &Self) -> bool { + match (&self.function, &other.function) { + (Some(func1), Some(func2)) => { + func1.level_sizes() == func2.level_sizes() && + func1.write_bytes() == func2.write_bytes() && + { + let mut f1 = Vec::with_capacity(func1.write_bytes()); + func1.write(&mut f1).expect("enough capacity"); + + let mut f2 = Vec::with_capacity(func2.write_bytes()); + func2.write(&mut f2).expect("enough capacity"); + + f1 == f2 + } + } + (None, None) => true, + _ => false, + } + } +} + +impl std::fmt::Debug for GoFmph { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GoFmph") + .field("level_sizes", &self.function.as_ref().map(|f| f.level_sizes())) + .field("bytes_size", &self.function.as_ref().map(|f| f.write_bytes())) + .finish_non_exhaustive() + } +} + +impl Serialize for GoFmph { + /// Potentially expensive, but should be used only when creating the file. + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match &self.function { + Some(f) => { + let mut v = Vec::with_capacity(f.write_bytes()); + f.write(&mut v).map_err(S::Error::custom)?; + serializer.serialize_some(&v) + } + None => serializer.serialize_none(), + } + } +} + +impl<'de> Deserialize<'de> for GoFmph { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + if let Some(buffer) = >>::deserialize(deserializer)? { + return Ok(GoFmph { + function: Some( + GOFunction::read(&mut std::io::Cursor::new(buffer)) + .map_err(D::Error::custom)?, + ), + }) + } + Ok(GoFmph { function: None }) + } +} diff --git a/crates/storage/nippy-jar/src/phf/mod.rs b/crates/storage/nippy-jar/src/phf/mod.rs new file mode 100644 index 000000000..84113181b --- /dev/null +++ b/crates/storage/nippy-jar/src/phf/mod.rs @@ -0,0 +1,48 @@ +use crate::NippyJarError; +use serde::{Deserialize, Serialize}; +use std::{clone::Clone, hash::Hash, marker::Sync}; + +mod fmph; +pub use fmph::Fmph; + +mod go_fmph; +pub use go_fmph::GoFmph; + +/// Trait to build and query a perfect hashing function. +pub trait PerfectHashingFunction: Serialize + for<'a> Deserialize<'a> { + /// Adds the key set and builds the perfect hashing function. + fn set_keys + Sync + Clone + Hash>( + &mut self, + keys: &[T], + ) -> Result<(), NippyJarError>; + + /// Get corresponding associated integer. There might be false positives. + fn get_index(&self, key: &[u8]) -> Result, NippyJarError>; +} + +/// Enumerates all types of perfect hashing functions. +#[derive(Debug, Serialize, Deserialize)] +#[cfg_attr(test, derive(PartialEq))] +pub enum Functions { + Fmph(Fmph), + GoFmph(GoFmph), +} + +impl PerfectHashingFunction for Functions { + fn set_keys + Sync + Clone + Hash>( + &mut self, + keys: &[T], + ) -> Result<(), NippyJarError> { + match self { + Functions::Fmph(f) => f.set_keys(keys), + Functions::GoFmph(f) => f.set_keys(keys), + } + } + + fn get_index(&self, key: &[u8]) -> Result, NippyJarError> { + match self { + Functions::Fmph(f) => f.get_index(key), + Functions::GoFmph(f) => f.get_index(key), + } + } +}