feat: add nippy-jar format for snapshots (#4512)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
joshieDo
2023-09-21 18:52:32 +01:00
committed by GitHub
parent 48231c8ae9
commit 7a5b631273
13 changed files with 1923 additions and 3 deletions

View File

@ -0,0 +1,35 @@
[package]
name = "reth-nippy-jar"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Immutable data store format"
[lib]
name = "reth_nippy_jar"
[dependencies]
memmap2 = "0.7.1"
bloomfilter = "1"
zstd = { version = "0.12", features = ["experimental", "zdict_builder"] }
ph = "0.8.0"
thiserror = "1.0"
bincode = "1.3"
serde = { version = "1.0", features = ["derive"] }
bytes = "1.5"
cuckoofilter = { version = "0.5.0", features = ["serde_support", "serde_bytes"] }
tempfile = "3.4"
sucds = "~0.8"
anyhow = "1.0"
[dev-dependencies]
rand = { version = "0.8", features = ["small_rng"] }
[features]
default = []

View File

@ -0,0 +1,82 @@
use crate::NippyJarError;
use serde::{Deserialize, Serialize};
use std::io::Write;
mod zstd;
pub use zstd::{Zstd, ZstdState};
/// Trait that will compress column values
pub trait Compression: Serialize + for<'a> Deserialize<'a> {
/// Returns decompressed data.
fn decompress(&self, value: &[u8]) -> Result<Vec<u8>, NippyJarError>;
/// Compresses data from `src` to `dest`
fn compress_to<W: Write>(&self, src: &[u8], dest: &mut W) -> Result<(), NippyJarError>;
/// Compresses data from `src`
fn compress(&self, src: &[u8]) -> Result<Vec<u8>, NippyJarError>;
/// Returns `true` if it's ready to compress.
///
/// Example: it will return false, if `zstd` with dictionary is set, but wasn't generated.
fn is_ready(&self) -> bool {
true
}
/// If required, prepares compression algorithm with an early pass on the data.
fn prepare_compression(
&mut self,
_columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
) -> Result<(), NippyJarError> {
Ok(())
}
}
/// Enum with different [`Compression`] types.
#[derive(Debug, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub enum Compressors {
Zstd(Zstd),
// Avoids irrefutable let errors. Remove this after adding another one.
Unused,
}
impl Compression for Compressors {
fn decompress(&self, value: &[u8]) -> Result<Vec<u8>, NippyJarError> {
match self {
Compressors::Zstd(zstd) => zstd.decompress(value),
Compressors::Unused => unimplemented!(),
}
}
fn compress_to<W: Write>(&self, src: &[u8], dest: &mut W) -> Result<(), NippyJarError> {
match self {
Compressors::Zstd(zstd) => zstd.compress_to(src, dest),
Compressors::Unused => unimplemented!(),
}
}
fn compress(&self, src: &[u8]) -> Result<Vec<u8>, NippyJarError> {
match self {
Compressors::Zstd(zstd) => zstd.compress(src),
Compressors::Unused => unimplemented!(),
}
}
fn is_ready(&self) -> bool {
match self {
Compressors::Zstd(zstd) => zstd.is_ready(),
Compressors::Unused => unimplemented!(),
}
}
fn prepare_compression(
&mut self,
columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
) -> Result<(), NippyJarError> {
match self {
Compressors::Zstd(zstd) => zstd.prepare_compression(columns),
Compressors::Unused => Ok(()),
}
}
}

View File

@ -0,0 +1,239 @@
use crate::{compression::Compression, NippyJarError};
use serde::{Deserialize, Serialize};
use std::{
fs::File,
io::{Read, Write},
};
use zstd::{
bulk::{Compressor, Decompressor},
dict::DecoderDictionary,
};
type RawDictionary = Vec<u8>;
#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
pub enum ZstdState {
#[default]
PendingDictionary,
Ready,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
/// Zstd compression structure. Supports a compression dictionary per column.
pub struct Zstd {
/// State. Should be ready before compressing.
pub(crate) state: ZstdState,
/// Compression level. A level of `0` uses zstd's default (currently `3`).
pub(crate) level: i32,
/// Uses custom dictionaries to compress data.
pub(crate) use_dict: bool,
/// Max size of a dictionary
pub(crate) max_dict_size: usize,
/// List of column dictionaries.
pub(crate) raw_dictionaries: Option<Vec<RawDictionary>>,
/// Number of columns to compress.
columns: usize,
}
impl Zstd {
/// Creates new [`Zstd`].
pub fn new(use_dict: bool, max_dict_size: usize, columns: usize) -> Self {
Self {
state: if use_dict { ZstdState::PendingDictionary } else { ZstdState::Ready },
level: 0,
use_dict,
max_dict_size,
raw_dictionaries: None,
columns,
}
}
pub fn with_level(mut self, level: i32) -> Self {
self.level = level;
self
}
/// If using dictionaries, creates a list of [`DecoderDictionary`].
///
/// Consumes `self.raw_dictionaries` in the process.
pub fn generate_decompress_dictionaries<'a>(&mut self) -> Option<Vec<DecoderDictionary<'a>>> {
self.raw_dictionaries.take().map(|dicts| {
// TODO Can we use ::new instead, and avoid consuming?
dicts.iter().map(|dict| DecoderDictionary::copy(dict)).collect()
})
}
/// Creates a list of [`Decompressor`] using the given dictionaries.
pub fn generate_decompressors<'a>(
&self,
dictionaries: &'a [DecoderDictionary<'a>],
) -> Result<Vec<Decompressor<'a>>, NippyJarError> {
debug_assert!(dictionaries.len() == self.columns);
Ok(dictionaries
.iter()
.map(Decompressor::with_prepared_dictionary)
.collect::<Result<Vec<_>, _>>()?)
}
/// If using dictionaries, creates a list of [`Compressor`].
pub fn generate_compressors<'a>(&self) -> Result<Option<Vec<Compressor<'a>>>, NippyJarError> {
match self.state {
ZstdState::PendingDictionary => Err(NippyJarError::CompressorNotReady),
ZstdState::Ready => {
if !self.use_dict {
return Ok(None)
}
let mut compressors = None;
if let Some(dictionaries) = &self.raw_dictionaries {
let mut cmp = Vec::with_capacity(dictionaries.len());
for dict in dictionaries {
cmp.push(Compressor::with_dictionary(0, dict)?);
}
compressors = Some(cmp)
}
Ok(compressors)
}
}
}
/// Compresses a value using a dictionary.
pub fn compress_with_dictionary(
column_value: &[u8],
tmp_buf: &mut Vec<u8>,
handle: &mut File,
compressor: Option<&mut Compressor>,
) -> Result<(), NippyJarError> {
if let Some(compressor) = compressor {
// Compressor requires the destination buffer to be big enough to write, otherwise it
// fails. However, we don't know how big it will be. If data is small
// enough, the compressed buffer will actually be larger. We keep retrying.
// If we eventually fail, it probably means it's another kind of error.
let mut multiplier = 1;
while let Err(err) = compressor.compress_to_buffer(column_value, tmp_buf) {
tmp_buf.reserve(column_value.len() * multiplier);
multiplier += 1;
if multiplier == 5 {
return Err(NippyJarError::Disconnect(err))
}
}
handle.write_all(tmp_buf)?;
tmp_buf.clear();
} else {
handle.write_all(column_value)?;
}
Ok(())
}
/// Decompresses a value using a dictionary to a user provided buffer.
pub fn decompress_with_dictionary(
column_value: &[u8],
output: &mut Vec<u8>,
decompressor: &mut Decompressor<'_>,
) -> Result<(), NippyJarError> {
let mut multiplier = 1;
// Just an estimation.
let required_capacity = column_value.len() * 2;
output.reserve(required_capacity.saturating_sub(output.capacity()));
// Decompressor requires the destination buffer to be big enough to write to, otherwise it
// fails. However, we don't know how big it will be. We keep retrying.
// If we eventually fail, it probably means it's another kind of error.
while let Err(err) = decompressor.decompress_to_buffer(column_value, output) {
output.reserve(
Decompressor::upper_bound(column_value).unwrap_or(required_capacity) * multiplier,
);
multiplier += 1;
if multiplier == 5 {
return Err(NippyJarError::Disconnect(err))
}
}
Ok(())
}
}
impl Compression for Zstd {
fn decompress(&self, value: &[u8]) -> Result<Vec<u8>, NippyJarError> {
let mut decompressed = Vec::with_capacity(value.len() * 2);
let mut decoder = zstd::Decoder::new(value)?;
decoder.read_to_end(&mut decompressed)?;
Ok(decompressed)
}
fn compress_to<W: Write>(&self, src: &[u8], dest: &mut W) -> Result<(), NippyJarError> {
let mut encoder = zstd::Encoder::new(dest, self.level)?;
encoder.write_all(src)?;
encoder.finish()?;
Ok(())
}
fn compress(&self, src: &[u8]) -> Result<Vec<u8>, NippyJarError> {
let mut compressed = Vec::with_capacity(src.len());
self.compress_to(src, &mut compressed)?;
Ok(compressed)
}
fn is_ready(&self) -> bool {
matches!(self.state, ZstdState::Ready)
}
/// If using it with dictionaries, prepares a dictionary for each column.
fn prepare_compression(
&mut self,
columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
) -> Result<(), NippyJarError> {
if !self.use_dict {
return Ok(())
}
// There's a per 2GB hard limit on each column data set for training
// REFERENCE: https://github.com/facebook/zstd/blob/dev/programs/zstd.1.md#dictionary-builder
// ```
// -M#, --memory=#: Limit the amount of sample data loaded for training (default: 2 GB).
// Note that the default (2 GB) is also the maximum. This parameter can be useful in
// situations where the training set size is not well controlled and could be potentially
// very large. Since speed of the training process is directly correlated to the size of the
// training sample set, a smaller sample set leads to faster training.`
// ```
if columns.len() != self.columns {
return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
}
// TODO: parallel calculation
let mut dictionaries = vec![];
for column in columns {
// ZSTD requires all training data to be continuous in memory, alongside the size of
// each entry
let mut sizes = vec![];
let data: Vec<_> = column
.into_iter()
.flat_map(|data| {
sizes.push(data.len());
data
})
.collect();
dictionaries.push(zstd::dict::from_continuous(&data, &sizes, self.max_dict_size)?);
}
debug_assert_eq!(dictionaries.len(), self.columns);
self.raw_dictionaries = Some(dictionaries);
self.state = ZstdState::Ready;
Ok(())
}
}

View File

@ -0,0 +1,223 @@
use crate::{
compression::{Compression, Zstd},
InclusionFilter, NippyJar, NippyJarError, PerfectHashingFunction, Row,
};
use memmap2::Mmap;
use serde::{de::Deserialize, ser::Serialize};
use std::{clone::Clone, fs::File};
use sucds::int_vectors::Access;
use zstd::bulk::Decompressor;
/// Simple cursor implementation to retrieve data from [`NippyJar`].
pub struct NippyJarCursor<'a, H> {
/// [`NippyJar`] which holds most of the required configuration to read from the file.
jar: &'a NippyJar<H>,
/// Optional dictionary decompressors.
zstd_decompressors: Option<Vec<Decompressor<'a>>>,
/// Data file.
#[allow(unused)]
file_handle: File,
/// Data file.
mmap_handle: Mmap,
/// Temporary buffer to unload data to (if necessary), without reallocating memory on each
/// retrieval.
tmp_buf: Vec<u8>,
/// Cursor row position.
row: u64,
}
impl<'a, H> std::fmt::Debug for NippyJarCursor<'a, H>
where
H: Send + Sync + Serialize + for<'b> Deserialize<'b> + core::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NippyJarCursor").field("config", &self.jar).finish_non_exhaustive()
}
}
impl<'a, H> NippyJarCursor<'a, H>
where
H: Send + Sync + Serialize + for<'b> Deserialize<'b>,
{
pub fn new(
jar: &'a NippyJar<H>,
zstd_decompressors: Option<Vec<Decompressor<'a>>>,
) -> Result<Self, NippyJarError> {
let file = File::open(jar.data_path())?;
// SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
let mmap = unsafe { Mmap::map(&file)? };
Ok(NippyJarCursor {
jar,
zstd_decompressors,
file_handle: file,
mmap_handle: mmap,
tmp_buf: vec![],
row: 0,
})
}
/// Resets cursor to the beginning.
pub fn reset(&mut self) {
self.row = 0;
}
/// Returns a row, searching it by a key used during [`NippyJar::prepare_index`].
///
/// **May return false positives.**
///
/// Example usage would be querying a transactions file with a transaction hash which is **NOT**
/// stored in file.
pub fn row_by_key(&mut self, key: &[u8]) -> Result<Option<Row>, NippyJarError> {
if let (Some(filter), Some(phf)) = (&self.jar.filter, &self.jar.phf) {
// TODO: is it worth to parallize both?
// May have false positives
if filter.contains(key)? {
// May have false positives
if let Some(row_index) = phf.get_index(key)? {
self.row = self
.jar
.offsets_index
.access(row_index as usize)
.expect("built from same set") as u64;
return self.next_row()
}
}
} else {
return Err(NippyJarError::UnsupportedFilterQuery)
}
Ok(None)
}
/// Returns a row by its number.
pub fn row_by_number(&mut self, row: usize) -> Result<Option<Row>, NippyJarError> {
self.row = row as u64;
self.next_row()
}
/// Returns the current value and advances the row.
pub fn next_row(&mut self) -> Result<Option<Row>, NippyJarError> {
if self.row as usize * self.jar.columns >= self.jar.offsets.len() {
// Has reached the end
return Ok(None)
}
let mut row = Vec::with_capacity(self.jar.columns);
// Retrieve all column values from the row
for column in 0..self.jar.columns {
self.read_value(column, &mut row)?;
}
self.row += 1;
Ok(Some(row))
}
/// Returns a row, searching it by a key used during [`NippyJar::prepare_index`] by using a
/// `MASK` to only read certain columns from the row.
///
/// **May return false positives.**
///
/// Example usage would be querying a transactions file with a transaction hash which is **NOT**
/// stored in file.
pub fn row_by_key_with_cols<const MASK: usize, const COLUMNS: usize>(
&mut self,
key: &[u8],
) -> Result<Option<Row>, NippyJarError> {
if let (Some(filter), Some(phf)) = (&self.jar.filter, &self.jar.phf) {
// TODO: is it worth to parallize both?
// May have false positives
if filter.contains(key)? {
// May have false positives
if let Some(row_index) = phf.get_index(key)? {
self.row = self
.jar
.offsets_index
.access(row_index as usize)
.expect("built from same set") as u64;
return self.next_row_with_cols::<MASK, COLUMNS>()
}
}
} else {
return Err(NippyJarError::UnsupportedFilterQuery)
}
Ok(None)
}
/// Returns a row by its number by using a `MASK` to only read certain columns from the row.
pub fn row_by_number_with_cols<const MASK: usize, const COLUMNS: usize>(
&mut self,
row: usize,
) -> Result<Option<Row>, NippyJarError> {
self.row = row as u64;
self.next_row_with_cols::<MASK, COLUMNS>()
}
/// Returns the current value and advances the row.
///
/// Uses a `MASK` to only read certain columns from the row.
pub fn next_row_with_cols<const MASK: usize, const COLUMNS: usize>(
&mut self,
) -> Result<Option<Row>, NippyJarError> {
debug_assert!(COLUMNS == self.jar.columns);
if self.row as usize * self.jar.columns >= self.jar.offsets.len() {
// Has reached the end
return Ok(None)
}
let mut row = Vec::with_capacity(COLUMNS);
for column in 0..COLUMNS {
if MASK & (1 << column) != 0 {
self.read_value(column, &mut row)?
}
}
self.row += 1;
Ok(Some(row))
}
/// Takes the column index and reads the value for the corresponding column.
fn read_value(&mut self, column: usize, row: &mut Row) -> Result<(), NippyJarError> {
// Find out the offset of the column value
let offset_pos = self.row as usize * self.jar.columns + column;
let value_offset = self.jar.offsets.select(offset_pos).expect("should exist");
let column_value = if self.jar.offsets.len() == (offset_pos + 1) {
// It's the last column of the last row
&self.mmap_handle[value_offset..]
} else {
let next_value_offset = self.jar.offsets.select(offset_pos + 1).expect("should exist");
&self.mmap_handle[value_offset..next_value_offset]
};
if let Some(zstd_dict_decompressors) = self.zstd_decompressors.as_mut() {
self.tmp_buf.clear();
if let Some(decompressor) = zstd_dict_decompressors.get_mut(column) {
Zstd::decompress_with_dictionary(column_value, &mut self.tmp_buf, decompressor)?;
}
debug_assert!(!self.tmp_buf.is_empty());
row.push(self.tmp_buf.clone());
} else if let Some(compression) = &self.jar.compressor {
// Uses the chosen default decompressor
row.push(compression.decompress(column_value)?);
} else {
// Not compressed
// TODO: return Cow<&> instead of copying if there's no compression
row.push(column_value.to_vec())
}
Ok(())
}
}

View File

@ -0,0 +1,34 @@
use thiserror::Error;
/// Errors associated with [`crate::NippyJar`].
#[derive(Debug, Error)]
pub enum NippyJarError {
#[error(transparent)]
Disconnect(#[from] std::io::Error),
#[error(transparent)]
Bincode(#[from] Box<bincode::ErrorKind>),
#[error(transparent)]
EliasFano(#[from] anyhow::Error),
#[error("Compression was enabled, but it's not ready yet.")]
CompressorNotReady,
#[error("Decompression was enabled, but it's not ready yet.")]
DecompressorNotReady,
#[error("Number of columns does not match. {0} != {1}")]
ColumnLenMismatch(usize, usize),
#[error("UnexpectedMissingValue row: {0} col:{1}")]
UnexpectedMissingValue(u64, u64),
#[error(transparent)]
FilterError(#[from] cuckoofilter::CuckooError),
#[error("NippyJar initialized without filter.")]
FilterMissing,
#[error("Filter has reached max capacity.")]
FilterMaxCapacity,
#[error("Cuckoo was not properly initialized after loaded.")]
FilterCuckooNotLoaded,
#[error("Perfect hashing function doesn't have any keys added.")]
PHFMissingKeys,
#[error("NippyJar initialized without perfect hashing function.")]
PHFMissing,
#[error("NippyJar was built without an index.")]
UnsupportedFilterQuery,
}

View File

@ -0,0 +1,80 @@
use super::InclusionFilter;
use crate::NippyJarError;
use cuckoofilter::{self, CuckooFilter, ExportedCuckooFilter};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::hash_map::DefaultHasher;
/// [CuckooFilter](https://www.cs.cmu.edu/~dga/papers/cuckoo-conext2014.pdf). It builds and provides an approximated set-membership filter to answer queries such as "Does this element belong to this set?". Has a theoretical 3% false positive rate.
pub struct Cuckoo {
/// Remaining number of elements that can be added.
///
/// This is necessary because the inner implementation will fail on adding an element past capacity, **but it will still add it and remove other**: [source](https://github.com/axiomhq/rust-cuckoofilter/tree/624da891bed1dd5d002c8fa92ce0dcd301975561#notes--todos)
remaining: usize,
/// CuckooFilter.
filter: CuckooFilter<DefaultHasher>, // TODO does it need an actual hasher?
}
impl Cuckoo {
pub fn new(max_capacity: usize) -> Self {
Cuckoo { remaining: max_capacity, filter: CuckooFilter::with_capacity(max_capacity) }
}
}
impl InclusionFilter for Cuckoo {
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> {
if self.remaining == 0 {
return Err(NippyJarError::FilterMaxCapacity)
}
self.remaining -= 1;
Ok(self.filter.add(element)?)
}
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError> {
Ok(self.filter.contains(element))
}
}
impl std::fmt::Debug for Cuckoo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Cuckoo")
.field("remaining", &self.remaining)
.field("filter_size", &self.filter.memory_usage())
.finish_non_exhaustive()
}
}
#[cfg(test)]
impl PartialEq for Cuckoo {
fn eq(&self, _other: &Self) -> bool {
self.remaining == _other.remaining && {
let f1 = self.filter.export();
let f2 = _other.filter.export();
f1.length == f2.length && f1.values == f2.values
}
}
}
impl<'de> Deserialize<'de> for Cuckoo {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let (remaining, exported): (usize, ExportedCuckooFilter) =
Deserialize::deserialize(deserializer)?;
Ok(Cuckoo { remaining, filter: exported.into() })
}
}
impl Serialize for Cuckoo {
/// Potentially expensive, but should be used only when creating the file.
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
(self.remaining, self.filter.export()).serialize(serializer)
}
}

View File

@ -0,0 +1,39 @@
use crate::NippyJarError;
use serde::{Deserialize, Serialize};
mod cuckoo;
pub use cuckoo::Cuckoo;
/// Membership filter set trait.
pub trait InclusionFilter {
/// Add element to the inclusion list.
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError>;
/// Checks if the element belongs to the inclusion list. **There might be false positives.**
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError>;
}
/// Enum with different [`InclusionFilter`] types.
#[derive(Debug, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub enum InclusionFilters {
Cuckoo(Cuckoo),
// Avoids irrefutable let errors. Remove this after adding another one.
Unused,
}
impl InclusionFilter for InclusionFilters {
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> {
match self {
InclusionFilters::Cuckoo(c) => c.add(element),
InclusionFilters::Unused => todo!(),
}
}
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError> {
match self {
InclusionFilters::Cuckoo(c) => c.contains(element),
InclusionFilters::Unused => todo!(),
}
}
}

View File

@ -0,0 +1,809 @@
use serde::{Deserialize, Serialize};
use std::{
clone::Clone,
fs::File,
hash::Hash,
io::{Seek, Write},
marker::Sync,
path::{Path, PathBuf},
};
use sucds::{
int_vectors::PrefixSummedEliasFano,
mii_sequences::{EliasFano, EliasFanoBuilder},
Serializable,
};
pub mod filter;
use filter::{Cuckoo, InclusionFilter, InclusionFilters};
pub mod compression;
use compression::{Compression, Compressors};
pub mod phf;
use phf::{Fmph, Functions, GoFmph, PerfectHashingFunction};
mod error;
pub use error::NippyJarError;
mod cursor;
pub use cursor::NippyJarCursor;
const NIPPY_JAR_VERSION: usize = 1;
/// A [`Row`] is a list of its selected column values.
type Row = Vec<Vec<u8>>;
/// `NippyJar` is a specialized storage format designed for immutable data.
///
/// Data is organized into a columnar format, enabling column-based compression. Data retrieval
/// entails consulting an offset list and fetching the data from file via `mmap`.
///
/// PHF & Filters:
/// For data membership verification, the `filter` field can be configured with algorithms like
/// Bloom or Cuckoo filters. While these filters enable rapid membership checks, it's important to
/// note that **they may yield false positives but not false negatives**. Therefore, they serve as
/// preliminary checks (eg. in `by_hash` queries) and should be followed by data verification on
/// retrieval.
///
/// The `phf` (Perfect Hashing Function) and `offsets_index` fields facilitate the data retrieval
/// process in for example `by_hash` queries. Specifically, the PHF converts a query, such as a
/// block hash, into a unique integer. This integer is then used as an index in `offsets_index`,
/// which maps to the actual data location in the `offsets` list. Similar to the `filter`, the PHF
/// may also produce false positives but not false negatives, necessitating subsequent data
/// verification.
///
/// Note: that the key (eg. BlockHash) passed to a filter and phf does not need to actually be
/// stored.
///
/// Ultimately, the `freeze` function yields two files: a data file containing both the data and its
/// configuration, and an index file that houses the offsets and offsets_index.
#[derive(Debug, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct NippyJar<H = ()> {
/// The version of the NippyJar format.
version: usize,
/// User-defined header data.
/// Default: zero-sized unit type: no header data
user_header: H,
/// Number of data columns in the jar.
columns: usize,
/// Optional compression algorithm applied to the data.
compressor: Option<Compressors>,
/// Optional filter function for data membership checks.
filter: Option<InclusionFilters>,
/// Optional Perfect Hashing Function (PHF) for unique offset mapping.
phf: Option<Functions>,
/// Index mapping PHF output to value offsets in `offsets`.
#[serde(skip)]
offsets_index: PrefixSummedEliasFano,
/// Offsets within the file for each column value, arranged by row and column.
#[serde(skip)]
offsets: EliasFano,
/// Data path for file. Index file will be `{path}.idx`
#[serde(skip)]
path: Option<PathBuf>,
}
impl NippyJar<()> {
/// Creates a new [`NippyJar`] without an user-defined header data.
pub fn new_without_header(columns: usize, path: &Path) -> Self {
NippyJar::<()>::new(columns, path, ())
}
/// Loads the file configuration and returns [`Self`] on a jar without user-defined header data.
pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
NippyJar::<()>::load(path)
}
}
impl<H> NippyJar<H>
where
H: Send + Sync + Serialize + for<'a> Deserialize<'a>,
{
/// Creates a new [`NippyJar`] with a user-defined header data.
pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
NippyJar {
version: NIPPY_JAR_VERSION,
user_header,
columns,
compressor: None,
filter: None,
phf: None,
offsets: EliasFano::default(),
offsets_index: PrefixSummedEliasFano::default(),
path: Some(path.to_path_buf()),
}
}
/// Adds [`compression::Zstd`] compression.
pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
self.compressor =
Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
self
}
/// Adds [`filter::Cuckoo`] filter.
pub fn with_cuckoo_filter(mut self, max_capacity: usize) -> Self {
self.filter = Some(InclusionFilters::Cuckoo(Cuckoo::new(max_capacity)));
self
}
/// Adds [`phf::Fmph`] perfect hashing function.
pub fn with_mphf(mut self) -> Self {
self.phf = Some(Functions::Fmph(Fmph::new()));
self
}
/// Adds [`phf::GoFmph`] perfect hashing function.
pub fn with_gomphf(mut self) -> Self {
self.phf = Some(Functions::GoFmph(GoFmph::new()));
self
}
/// Gets a reference to the user header.
pub fn user_header(&self) -> &H {
&self.user_header
}
/// Loads the file configuration and returns [`Self`].
///
/// **The user must ensure the header type matches the one used during the jar's creation.**
pub fn load(path: &Path) -> Result<Self, NippyJarError> {
// Read [`Self`] located at the data file.
let data_file = File::open(path)?;
// SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
let data_reader = unsafe { memmap2::Mmap::map(&data_file)? };
let mut obj: Self = bincode::deserialize_from(data_reader.as_ref())?;
obj.path = Some(path.to_path_buf());
// Read the offsets lists located at the index file.
let offsets_file = File::open(obj.index_path())?;
// SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
let mmap = unsafe { memmap2::Mmap::map(&offsets_file)? };
let mut offsets_reader = mmap.as_ref();
obj.offsets = EliasFano::deserialize_from(&mut offsets_reader)?;
obj.offsets_index = PrefixSummedEliasFano::deserialize_from(offsets_reader)?;
Ok(obj)
}
/// Returns the path from the data file
pub fn data_path(&self) -> PathBuf {
self.path.clone().expect("exists")
}
/// Returns the path from the index file
pub fn index_path(&self) -> PathBuf {
let data_path = self.data_path();
data_path
.parent()
.expect("exists")
.join(format!("{}.idx", data_path.file_name().expect("exists").to_string_lossy()))
}
/// If required, prepares any compression algorithm to an early pass of the data.
pub fn prepare_compression(
&mut self,
columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
) -> Result<(), NippyJarError> {
// Makes any necessary preparations for the compressors
if let Some(compression) = &mut self.compressor {
compression.prepare_compression(columns)?;
}
Ok(())
}
/// Prepares beforehand the offsets index for querying rows based on `values` (eg. transaction
/// hash). Expects `values` to be sorted in the same way as the data that is going to be
/// later on inserted.
pub fn prepare_index<T: AsRef<[u8]> + Sync + Clone + Hash>(
&mut self,
values: &[T],
) -> Result<(), NippyJarError> {
let mut offsets_index = vec![0; values.len()];
// Builds perfect hashing function from the values
if let Some(phf) = self.phf.as_mut() {
phf.set_keys(values)?;
}
if self.filter.is_some() || self.phf.is_some() {
for (row_num, v) in values.iter().enumerate() {
if let Some(filter) = self.filter.as_mut() {
filter.add(v.as_ref())?;
}
if let Some(phf) = self.phf.as_mut() {
// Points to the first column value offset of the row.
let index = phf.get_index(v.as_ref())?.expect("initialized") as usize;
let _ = std::mem::replace(&mut offsets_index[index], row_num as u64);
}
}
}
self.offsets_index = PrefixSummedEliasFano::from_slice(&offsets_index)?;
Ok(())
}
/// Writes all data and configuration to a file and the offset index to another.
pub fn freeze(
&mut self,
columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
total_rows: u64,
) -> Result<(), NippyJarError> {
let mut file = self.freeze_check(&columns)?;
self.freeze_config(&mut file)?;
// Special case for zstd that might use custom dictionaries/compressors per column
// If any other compression algorithm is added and uses a similar flow, then revisit
// implementation
let mut maybe_zstd_compressors = None;
if let Some(Compressors::Zstd(zstd)) = &self.compressor {
maybe_zstd_compressors = zstd.generate_compressors()?;
}
// Temporary buffer to avoid multiple reallocations if compressing to a buffer (eg. zstd w/
// dict)
let mut tmp_buf = Vec::with_capacity(100);
// Write all rows while taking all row start offsets
let mut row_number = 0u64;
let mut offsets = Vec::with_capacity(total_rows as usize * self.columns);
let mut column_iterators =
columns.into_iter().map(|v| v.into_iter()).collect::<Vec<_>>().into_iter();
loop {
let mut iterators = Vec::with_capacity(self.columns);
// Write the column value of each row
// TODO: iter_mut if we remove the IntoIterator interface.
for (column_number, mut column_iter) in column_iterators.enumerate() {
offsets.push(file.stream_position()? as usize);
match column_iter.next() {
Some(value) => {
if let Some(compression) = &self.compressor {
// Special zstd case with dictionaries
if let (Some(dict_compressors), Compressors::Zstd(_)) =
(maybe_zstd_compressors.as_mut(), compression)
{
compression::Zstd::compress_with_dictionary(
&value,
&mut tmp_buf,
&mut file,
Some(dict_compressors.get_mut(column_number).expect("exists")),
)?;
} else {
compression.compress_to(&value, &mut file)?;
}
} else {
file.write_all(&value)?;
}
}
None => {
return Err(NippyJarError::UnexpectedMissingValue(
row_number,
column_number as u64,
))
}
}
iterators.push(column_iter);
}
row_number += 1;
if row_number == total_rows {
break
}
column_iterators = iterators.into_iter();
}
// Write offsets and offset index to file
self.freeze_offsets(offsets)?;
Ok(())
}
/// Freezes offsets and its own index.
fn freeze_offsets(&mut self, offsets: Vec<usize>) -> Result<(), NippyJarError> {
if !offsets.is_empty() {
let mut builder =
EliasFanoBuilder::new(*offsets.last().expect("qed") + 1, offsets.len())?;
for offset in offsets {
builder.push(offset)?;
}
self.offsets = builder.build().enable_rank();
}
let mut file = File::create(self.index_path())?;
self.offsets.serialize_into(&mut file)?;
self.offsets_index.serialize_into(file)?;
Ok(())
}
/// Safety checks before creating and returning a [`File`] handle to write data to.
fn freeze_check(
&mut self,
columns: &Vec<impl IntoIterator<Item = Vec<u8>>>,
) -> Result<File, NippyJarError> {
if columns.len() != self.columns {
return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
}
if let Some(compression) = &self.compressor {
if !compression.is_ready() {
return Err(NippyJarError::CompressorNotReady)
}
}
// Check `prepare_index` was called.
if let Some(phf) = &self.phf {
let _ = phf.get_index(&[])?;
}
Ok(File::create(self.data_path())?)
}
/// Writes all necessary configuration to file.
fn freeze_config(&mut self, handle: &mut File) -> Result<(), NippyJarError> {
// TODO Split Dictionaries and Bloomfilters Configuration so we dont have to load everything
// at once
Ok(bincode::serialize_into(handle, &self)?)
}
}
impl<H> InclusionFilter for NippyJar<H>
where
H: Send + Sync + Serialize + for<'a> Deserialize<'a>,
{
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> {
self.filter.as_mut().ok_or(NippyJarError::FilterMissing)?.add(element)
}
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError> {
self.filter.as_ref().ok_or(NippyJarError::FilterMissing)?.contains(element)
}
}
impl<H> PerfectHashingFunction for NippyJar<H>
where
H: Send + Sync + Serialize + for<'a> Deserialize<'a>,
{
fn set_keys<T: AsRef<[u8]> + Sync + Clone + Hash>(
&mut self,
keys: &[T],
) -> Result<(), NippyJarError> {
self.phf.as_mut().ok_or(NippyJarError::PHFMissing)?.set_keys(keys)
}
fn get_index(&self, key: &[u8]) -> Result<Option<u64>, NippyJarError> {
self.phf.as_ref().ok_or(NippyJarError::PHFMissing)?.get_index(key)
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
use std::collections::HashSet;
type ColumnValues = Vec<Vec<u8>>;
fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
let value_length = 32;
let num_rows = 100;
let mut vec: Vec<u8> = vec![0; value_length];
let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_entropy);
let mut gen = || {
(0..num_rows)
.map(|_| {
rng.fill_bytes(&mut vec[..]);
vec.clone()
})
.collect()
};
(gen(), gen())
}
#[test]
fn test_phf() {
let (col1, col2) = test_data(None);
let num_columns = 2;
let num_rows = col1.len() as u64;
let file_path = tempfile::NamedTempFile::new().unwrap();
let mut nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(matches!(NippyJar::set_keys(&mut nippy, &col1), Err(NippyJarError::PHFMissing)));
let check_phf = |nippy: &mut NippyJar<_>| {
assert!(matches!(
NippyJar::get_index(nippy, &col1[0]),
Err(NippyJarError::PHFMissingKeys)
));
assert!(NippyJar::set_keys(nippy, &col1).is_ok());
let collect_indexes = |nippy: &NippyJar<_>| -> Vec<u64> {
col1.iter()
.map(|value| NippyJar::get_index(nippy, value.as_slice()).unwrap().unwrap())
.collect()
};
// Ensure all indexes are unique
let indexes = collect_indexes(nippy);
assert_eq!(indexes.iter().collect::<HashSet<_>>().len(), indexes.len());
// Ensure reproducibility
assert!(NippyJar::set_keys(nippy, &col1).is_ok());
assert_eq!(indexes, collect_indexes(nippy));
// Ensure that loaded phf provides the same function outputs
nippy.prepare_index(&col1).unwrap();
nippy.freeze(vec![col1.clone(), col2.clone()], num_rows).unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(indexes, collect_indexes(&loaded_nippy));
};
// mphf bytes size for 100 values of 32 bytes: 54
nippy = nippy.with_mphf();
check_phf(&mut nippy);
// mphf bytes size for 100 values of 32 bytes: 46
nippy = nippy.with_gomphf();
check_phf(&mut nippy);
}
#[test]
fn test_filter() {
let (col1, col2) = test_data(Some(1));
let num_columns = 2;
let num_rows = col1.len() as u64;
let file_path = tempfile::NamedTempFile::new().unwrap();
let mut nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(matches!(
InclusionFilter::add(&mut nippy, &col1[0]),
Err(NippyJarError::FilterMissing)
));
nippy = nippy.with_cuckoo_filter(4);
// Add col1[0]
assert!(!InclusionFilter::contains(&nippy, &col1[0]).unwrap());
assert!(InclusionFilter::add(&mut nippy, &col1[0]).is_ok());
assert!(InclusionFilter::contains(&nippy, &col1[0]).unwrap());
// Add col1[1]
assert!(!InclusionFilter::contains(&nippy, &col1[1]).unwrap());
assert!(InclusionFilter::add(&mut nippy, &col1[1]).is_ok());
assert!(InclusionFilter::contains(&nippy, &col1[1]).unwrap());
// // Add more columns until max_capacity
assert!(InclusionFilter::add(&mut nippy, &col1[2]).is_ok());
assert!(InclusionFilter::add(&mut nippy, &col1[3]).is_ok());
assert!(matches!(
InclusionFilter::add(&mut nippy, &col1[4]),
Err(NippyJarError::FilterMaxCapacity)
));
nippy.freeze(vec![col1.clone(), col2.clone()], num_rows).unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy, loaded_nippy);
assert!(InclusionFilter::contains(&loaded_nippy, &col1[0]).unwrap());
assert!(InclusionFilter::contains(&loaded_nippy, &col1[1]).unwrap());
assert!(InclusionFilter::contains(&loaded_nippy, &col1[2]).unwrap());
assert!(InclusionFilter::contains(&loaded_nippy, &col1[3]).unwrap());
assert!(!InclusionFilter::contains(&loaded_nippy, &col1[4]).unwrap());
}
#[test]
fn test_zstd_with_dictionaries() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(nippy.compressor.is_none());
let mut nippy =
NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
assert!(nippy.compressor.is_some());
if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor {
assert!(matches!(zstd.generate_compressors(), Err(NippyJarError::CompressorNotReady)));
// Make sure the number of column iterators match the initial set up ones.
assert!(matches!(
zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
));
}
let data = vec![col1.clone(), col2.clone()];
// If ZSTD is enabled, do not write to the file unless the column dictionaries have been
// calculated.
assert!(matches!(
nippy.freeze(data.clone(), num_rows),
Err(NippyJarError::CompressorNotReady)
));
nippy.prepare_compression(data.clone()).unwrap();
if let Some(Compressors::Zstd(zstd)) = &nippy.compressor {
assert!(matches!(
(&zstd.state, zstd.raw_dictionaries.as_ref().map(|dict| dict.len())),
(compression::ZstdState::Ready, Some(columns)) if columns == num_columns
));
}
nippy.freeze(data.clone(), num_rows).unwrap();
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy, loaded_nippy);
let mut dicts = vec![];
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_mut() {
dicts = zstd.generate_decompress_dictionaries().unwrap()
}
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() {
let mut cursor = NippyJarCursor::new(
&loaded_nippy,
Some(zstd.generate_decompressors(&dicts).unwrap()),
)
.unwrap();
// Iterate over compressed values and compare
let mut row_index = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!((&row[0], &row[1]), (&data[0][row_index], &data[1][row_index]));
row_index += 1;
}
}
}
#[test]
fn test_zstd_no_dictionaries() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(nippy.compressor.is_none());
let mut nippy =
NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
assert!(nippy.compressor.is_some());
let data = vec![col1.clone(), col2.clone()];
nippy.freeze(data.clone(), num_rows).unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy, loaded_nippy);
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() {
assert!(!zstd.use_dict);
let mut cursor = NippyJarCursor::new(&loaded_nippy, None).unwrap();
// Iterate over compressed values and compare
let mut row_index = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!((&row[0], &row[1]), (&data[0][row_index], &data[1][row_index]));
row_index += 1;
}
} else {
panic!("Expected Zstd compressor")
}
}
/// Tests NippyJar with everything enabled: compression, filter, offset list and offset index.
#[test]
fn test_full_nippy_jar() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let data = vec![col1.clone(), col2.clone()];
let block_start = 500;
#[derive(Serialize, Deserialize, Debug)]
pub struct BlockJarHeader {
block_start: usize,
}
// Create file
{
let mut nippy =
NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
.with_zstd(true, 5000)
.with_cuckoo_filter(col1.len())
.with_mphf();
nippy.prepare_compression(data.clone()).unwrap();
nippy.prepare_index(&col1).unwrap();
nippy.freeze(data.clone(), num_rows).unwrap();
}
// Read file
{
let mut loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
assert!(loaded_nippy.compressor.is_some());
assert!(loaded_nippy.filter.is_some());
assert!(loaded_nippy.phf.is_some());
assert_eq!(loaded_nippy.user_header().block_start, block_start);
let mut dicts = vec![];
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_mut() {
dicts = zstd.generate_decompress_dictionaries().unwrap()
}
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() {
let mut cursor = NippyJarCursor::new(
&loaded_nippy,
Some(zstd.generate_decompressors(&dicts).unwrap()),
)
.unwrap();
// Iterate over compressed values and compare
let mut row_num = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!((&row[0], &row[1]), (&data[0][row_num], &data[1][row_num]));
row_num += 1;
}
// Shuffled for chaos.
let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
data.shuffle(&mut rand::thread_rng());
for (row_num, (v0, v1)) in data {
// Simulates `by_hash` queries by iterating col1 values, which were used to
// create the inner index.
let row_by_value = cursor.row_by_key(v0).unwrap().unwrap();
assert_eq!((&row_by_value[0], &row_by_value[1]), (v0, v1));
// Simulates `by_number` queries
let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
assert_eq!(row_by_value, row_by_num);
}
}
}
}
#[test]
fn test_selectable_column_values() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let data = vec![col1.clone(), col2.clone()];
// Create file
{
let mut nippy = NippyJar::new_without_header(num_columns, file_path.path())
.with_zstd(true, 5000)
.with_cuckoo_filter(col1.len())
.with_mphf();
nippy.prepare_compression(data.clone()).unwrap();
nippy.prepare_index(&col1).unwrap();
nippy.freeze(data.clone(), num_rows).unwrap();
}
// Read file
{
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
let mut dicts = vec![];
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_mut() {
dicts = zstd.generate_decompress_dictionaries().unwrap()
}
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor.as_ref() {
let mut cursor = NippyJarCursor::new(
&loaded_nippy,
Some(zstd.generate_decompressors(&dicts).unwrap()),
)
.unwrap();
// Shuffled for chaos.
let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
data.shuffle(&mut rand::thread_rng());
// Imagine `Blocks` snapshot file has two columns: `Block | StoredWithdrawals`
const BLOCKS_FULL_MASK: usize = 0b11;
const BLOCKS_COLUMNS: usize = 2;
// Read both columns
for (row_num, (v0, v1)) in &data {
// Simulates `by_hash` queries by iterating col1 values, which were used to
// create the inner index.
let row_by_value = cursor
.row_by_key_with_cols::<BLOCKS_FULL_MASK, BLOCKS_COLUMNS>(v0)
.unwrap()
.unwrap();
assert_eq!((&row_by_value[0], &row_by_value[1]), (*v0, *v1));
// Simulates `by_number` queries
let row_by_num = cursor
.row_by_number_with_cols::<BLOCKS_FULL_MASK, BLOCKS_COLUMNS>(*row_num)
.unwrap()
.unwrap();
assert_eq!(row_by_value, row_by_num);
}
// Read first column only: `Block`
const BLOCKS_BLOCK_MASK: usize = 0b01;
for (row_num, (v0, _)) in &data {
// Simulates `by_hash` queries by iterating col1 values, which were used to
// create the inner index.
let row_by_value = cursor
.row_by_key_with_cols::<BLOCKS_BLOCK_MASK, BLOCKS_COLUMNS>(v0)
.unwrap()
.unwrap();
assert_eq!(row_by_value.len(), 1);
assert_eq!(&row_by_value[0], *v0);
// Simulates `by_number` queries
let row_by_num = cursor
.row_by_number_with_cols::<BLOCKS_BLOCK_MASK, BLOCKS_COLUMNS>(*row_num)
.unwrap()
.unwrap();
assert_eq!(row_by_num.len(), 1);
assert_eq!(row_by_value, row_by_num);
}
// Read second column only: `Block`
const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
for (row_num, (v0, v1)) in &data {
// Simulates `by_hash` queries by iterating col1 values, which were used to
// create the inner index.
let row_by_value = cursor
.row_by_key_with_cols::<BLOCKS_WITHDRAWAL_MASK, BLOCKS_COLUMNS>(v0)
.unwrap()
.unwrap();
assert_eq!(row_by_value.len(), 1);
assert_eq!(&row_by_value[0], *v1);
// Simulates `by_number` queries
let row_by_num = cursor
.row_by_number_with_cols::<BLOCKS_WITHDRAWAL_MASK, BLOCKS_COLUMNS>(*row_num)
.unwrap()
.unwrap();
assert_eq!(row_by_num.len(), 1);
assert_eq!(row_by_value, row_by_num);
}
// Read nothing
const BLOCKS_EMPTY_MASK: usize = 0b00;
for (row_num, (v0, _)) in &data {
// Simulates `by_hash` queries by iterating col1 values, which were used to
// create the inner index.
assert!(cursor
.row_by_key_with_cols::<BLOCKS_EMPTY_MASK, BLOCKS_COLUMNS>(v0)
.unwrap()
.unwrap()
.is_empty());
// Simulates `by_number` queries
assert!(cursor
.row_by_number_with_cols::<BLOCKS_EMPTY_MASK, BLOCKS_COLUMNS>(*row_num)
.unwrap()
.unwrap()
.is_empty());
}
}
}
}
}

View File

@ -0,0 +1,104 @@
use crate::{NippyJarError, PerfectHashingFunction};
use ph::fmph::{BuildConf, Function};
use serde::{
de::Error as DeSerdeError, ser::Error as SerdeError, Deserialize, Deserializer, Serialize,
Serializer,
};
use std::{clone::Clone, hash::Hash, marker::Sync};
/// Wrapper struct for [`Function`]. Implementation of the following [paper](https://dl.acm.org/doi/10.1145/3596453).
#[derive(Default)]
pub struct Fmph {
function: Option<Function>,
}
impl Fmph {
pub fn new() -> Self {
Self { function: None }
}
}
impl PerfectHashingFunction for Fmph {
fn set_keys<T: AsRef<[u8]> + Sync + Clone + Hash>(
&mut self,
keys: &[T],
) -> Result<(), NippyJarError> {
self.function = Some(Function::from_slice_with_conf(
keys,
BuildConf { use_multiple_threads: true, ..Default::default() },
));
Ok(())
}
fn get_index(&self, key: &[u8]) -> Result<Option<u64>, NippyJarError> {
if let Some(f) = &self.function {
return Ok(f.get(key))
}
Err(NippyJarError::PHFMissingKeys)
}
}
#[cfg(test)]
impl PartialEq for Fmph {
fn eq(&self, _other: &Self) -> bool {
match (&self.function, &_other.function) {
(Some(func1), Some(func2)) => {
func1.level_sizes() == func2.level_sizes() &&
func1.write_bytes() == func2.write_bytes() &&
{
let mut f1 = Vec::with_capacity(func1.write_bytes());
func1.write(&mut f1).expect("enough capacity");
let mut f2 = Vec::with_capacity(func2.write_bytes());
func2.write(&mut f2).expect("enough capacity");
f1 == f2
}
}
(None, None) => true,
_ => false,
}
}
}
impl std::fmt::Debug for Fmph {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Fmph")
.field("level_sizes", &self.function.as_ref().map(|f| f.level_sizes()))
.field("bytes_size", &self.function.as_ref().map(|f| f.write_bytes()))
.finish_non_exhaustive()
}
}
impl Serialize for Fmph {
/// Potentially expensive, but should be used only when creating the file.
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match &self.function {
Some(f) => {
let mut v = Vec::with_capacity(f.write_bytes());
f.write(&mut v).map_err(S::Error::custom)?;
serializer.serialize_some(&v)
}
None => serializer.serialize_none(),
}
}
}
impl<'de> Deserialize<'de> for Fmph {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
if let Some(buffer) = <Option<Vec<u8>>>::deserialize(deserializer)? {
return Ok(Fmph {
function: Some(
Function::read(&mut std::io::Cursor::new(buffer)).map_err(D::Error::custom)?,
),
})
}
Ok(Fmph { function: None })
}
}

View File

@ -0,0 +1,105 @@
use crate::{NippyJarError, PerfectHashingFunction};
use ph::fmph::{GOBuildConf, GOFunction};
use serde::{
de::Error as DeSerdeError, ser::Error as SerdeError, Deserialize, Deserializer, Serialize,
Serializer,
};
use std::{clone::Clone, hash::Hash, marker::Sync};
/// Wrapper struct for [`GOFunction`]. Implementation of the following [paper](https://dl.acm.org/doi/10.1145/3596453).
#[derive(Default)]
pub struct GoFmph {
function: Option<GOFunction>,
}
impl GoFmph {
pub fn new() -> Self {
Self { function: None }
}
}
impl PerfectHashingFunction for GoFmph {
fn set_keys<T: AsRef<[u8]> + Sync + Clone + Hash>(
&mut self,
keys: &[T],
) -> Result<(), NippyJarError> {
self.function = Some(GOFunction::from_slice_with_conf(
keys,
GOBuildConf { use_multiple_threads: true, ..Default::default() },
));
Ok(())
}
fn get_index(&self, key: &[u8]) -> Result<Option<u64>, NippyJarError> {
if let Some(f) = &self.function {
return Ok(f.get(key))
}
Err(NippyJarError::PHFMissingKeys)
}
}
#[cfg(test)]
impl PartialEq for GoFmph {
fn eq(&self, other: &Self) -> bool {
match (&self.function, &other.function) {
(Some(func1), Some(func2)) => {
func1.level_sizes() == func2.level_sizes() &&
func1.write_bytes() == func2.write_bytes() &&
{
let mut f1 = Vec::with_capacity(func1.write_bytes());
func1.write(&mut f1).expect("enough capacity");
let mut f2 = Vec::with_capacity(func2.write_bytes());
func2.write(&mut f2).expect("enough capacity");
f1 == f2
}
}
(None, None) => true,
_ => false,
}
}
}
impl std::fmt::Debug for GoFmph {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GoFmph")
.field("level_sizes", &self.function.as_ref().map(|f| f.level_sizes()))
.field("bytes_size", &self.function.as_ref().map(|f| f.write_bytes()))
.finish_non_exhaustive()
}
}
impl Serialize for GoFmph {
/// Potentially expensive, but should be used only when creating the file.
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match &self.function {
Some(f) => {
let mut v = Vec::with_capacity(f.write_bytes());
f.write(&mut v).map_err(S::Error::custom)?;
serializer.serialize_some(&v)
}
None => serializer.serialize_none(),
}
}
}
impl<'de> Deserialize<'de> for GoFmph {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
if let Some(buffer) = <Option<Vec<u8>>>::deserialize(deserializer)? {
return Ok(GoFmph {
function: Some(
GOFunction::read(&mut std::io::Cursor::new(buffer))
.map_err(D::Error::custom)?,
),
})
}
Ok(GoFmph { function: None })
}
}

View File

@ -0,0 +1,48 @@
use crate::NippyJarError;
use serde::{Deserialize, Serialize};
use std::{clone::Clone, hash::Hash, marker::Sync};
mod fmph;
pub use fmph::Fmph;
mod go_fmph;
pub use go_fmph::GoFmph;
/// Trait to build and query a perfect hashing function.
pub trait PerfectHashingFunction: Serialize + for<'a> Deserialize<'a> {
/// Adds the key set and builds the perfect hashing function.
fn set_keys<T: AsRef<[u8]> + Sync + Clone + Hash>(
&mut self,
keys: &[T],
) -> Result<(), NippyJarError>;
/// Get corresponding associated integer. There might be false positives.
fn get_index(&self, key: &[u8]) -> Result<Option<u64>, NippyJarError>;
}
/// Enumerates all types of perfect hashing functions.
#[derive(Debug, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub enum Functions {
Fmph(Fmph),
GoFmph(GoFmph),
}
impl PerfectHashingFunction for Functions {
fn set_keys<T: AsRef<[u8]> + Sync + Clone + Hash>(
&mut self,
keys: &[T],
) -> Result<(), NippyJarError> {
match self {
Functions::Fmph(f) => f.set_keys(keys),
Functions::GoFmph(f) => f.set_keys(keys),
}
}
fn get_index(&self, key: &[u8]) -> Result<Option<u64>, NippyJarError> {
match self {
Functions::Fmph(f) => f.get_index(key),
Functions::GoFmph(f) => f.get_index(key),
}
}
}