mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
chore: remove PerfectHasingFunction Filters (#10627)
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
This commit is contained in:
35
Cargo.lock
generated
35
Cargo.lock
generated
@ -2175,20 +2175,6 @@ dependencies = [
|
||||
"cipher",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cuckoofilter"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b810a8449931679f64cd7eef1bbd0fa315801b6d5d9cdc1ace2804d6529eee18"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"fnv",
|
||||
"rand 0.7.3",
|
||||
"serde",
|
||||
"serde_bytes",
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "curve25519-dalek"
|
||||
version = "4.1.3"
|
||||
@ -7486,14 +7472,12 @@ version = "1.0.6"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bincode",
|
||||
"cuckoofilter",
|
||||
"derive_more 1.0.0",
|
||||
"lz4_flex",
|
||||
"memmap2",
|
||||
"rand 0.8.5",
|
||||
"reth-fs-util",
|
||||
"serde",
|
||||
"sucds",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tracing",
|
||||
@ -9329,15 +9313,6 @@ dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_bytes"
|
||||
version = "0.11.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.209"
|
||||
@ -9773,16 +9748,6 @@ version = "2.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
|
||||
|
||||
[[package]]
|
||||
name = "sucds"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d53d46182afe6ed822a94c54a532dc0d59691a8f49226bdc4596529ca864cdd6"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-common"
|
||||
version = "12.11.0"
|
||||
|
||||
@ -1,38 +0,0 @@
|
||||
use strum::AsRefStr;
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
/// Static File filters.
|
||||
pub enum Filters {
|
||||
/// Static File uses filters with [`InclusionFilter`] and [`PerfectHashingFunction`].
|
||||
WithFilters(InclusionFilter, PerfectHashingFunction),
|
||||
/// Static File doesn't use any filters.
|
||||
WithoutFilters,
|
||||
}
|
||||
|
||||
impl Filters {
|
||||
/// Returns `true` if static file uses filters.
|
||||
pub const fn has_filters(&self) -> bool {
|
||||
matches!(self, Self::WithFilters(_, _))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, AsRefStr)]
|
||||
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
|
||||
/// Static File inclusion filter. Also see [Filters].
|
||||
pub enum InclusionFilter {
|
||||
#[strum(serialize = "cuckoo")]
|
||||
/// Cuckoo filter
|
||||
Cuckoo,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, AsRefStr)]
|
||||
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
|
||||
/// Static File perfect hashing function. Also see [Filters].
|
||||
pub enum PerfectHashingFunction {
|
||||
#[strum(serialize = "fmph")]
|
||||
/// Fingerprint-Based Minimal Perfect Hash Function
|
||||
Fmph,
|
||||
#[strum(serialize = "gofmph")]
|
||||
/// Fingerprint-Based Minimal Perfect Hash Function with Group Optimization
|
||||
GoFmph,
|
||||
}
|
||||
@ -9,12 +9,10 @@
|
||||
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
|
||||
|
||||
mod compression;
|
||||
mod filters;
|
||||
mod segment;
|
||||
|
||||
use alloy_primitives::BlockNumber;
|
||||
pub use compression::Compression;
|
||||
pub use filters::{Filters, InclusionFilter, PerfectHashingFunction};
|
||||
pub use segment::{SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
|
||||
|
||||
/// Default static file block count.
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use crate::{BlockNumber, Compression, Filters, InclusionFilter};
|
||||
use crate::{BlockNumber, Compression};
|
||||
use alloy_primitives::TxNumber;
|
||||
use derive_more::Display;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -48,17 +48,7 @@ impl StaticFileSegment {
|
||||
|
||||
/// Returns the default configuration of the segment.
|
||||
pub const fn config(&self) -> SegmentConfig {
|
||||
let default_config = SegmentConfig {
|
||||
filters: Filters::WithFilters(
|
||||
InclusionFilter::Cuckoo,
|
||||
super::PerfectHashingFunction::Fmph,
|
||||
),
|
||||
compression: Compression::Lz4,
|
||||
};
|
||||
|
||||
match self {
|
||||
Self::Headers | Self::Transactions | Self::Receipts => default_config,
|
||||
}
|
||||
SegmentConfig { compression: Compression::Lz4 }
|
||||
}
|
||||
|
||||
/// Returns the number of columns for the segment
|
||||
@ -79,18 +69,12 @@ impl StaticFileSegment {
|
||||
/// Returns file name for the provided segment and range, alongside filters, compression.
|
||||
pub fn filename_with_configuration(
|
||||
&self,
|
||||
filters: Filters,
|
||||
compression: Compression,
|
||||
block_range: &SegmentRangeInclusive,
|
||||
) -> String {
|
||||
let prefix = self.filename(block_range);
|
||||
|
||||
let filters_name = match filters {
|
||||
Filters::WithFilters(inclusion_filter, phf) => {
|
||||
format!("{}-{}", inclusion_filter.as_ref(), phf.as_ref())
|
||||
}
|
||||
Filters::WithoutFilters => "none".to_string(),
|
||||
};
|
||||
let filters_name = "none".to_string();
|
||||
|
||||
// ATTENTION: if changing the name format, be sure to reflect those changes in
|
||||
// [`Self::parse_filename`.]
|
||||
@ -306,8 +290,6 @@ impl SegmentHeader {
|
||||
/// Configuration used on the segment.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct SegmentConfig {
|
||||
/// Inclusion filters used on the segment
|
||||
pub filters: Filters,
|
||||
/// Compression used on the segment
|
||||
pub compression: Compression,
|
||||
}
|
||||
@ -380,46 +362,28 @@ mod tests {
|
||||
(
|
||||
StaticFileSegment::Headers,
|
||||
2..=30,
|
||||
"static_file_headers_2_30_cuckoo-fmph_lz4",
|
||||
Some((
|
||||
Compression::Lz4,
|
||||
Filters::WithFilters(
|
||||
InclusionFilter::Cuckoo,
|
||||
crate::PerfectHashingFunction::Fmph,
|
||||
),
|
||||
)),
|
||||
"static_file_headers_2_30_none_lz4",
|
||||
Some(Compression::Lz4),
|
||||
),
|
||||
(
|
||||
StaticFileSegment::Headers,
|
||||
2..=30,
|
||||
"static_file_headers_2_30_cuckoo-fmph_zstd",
|
||||
Some((
|
||||
Compression::Zstd,
|
||||
Filters::WithFilters(
|
||||
InclusionFilter::Cuckoo,
|
||||
crate::PerfectHashingFunction::Fmph,
|
||||
),
|
||||
)),
|
||||
"static_file_headers_2_30_none_zstd",
|
||||
Some(Compression::Zstd),
|
||||
),
|
||||
(
|
||||
StaticFileSegment::Headers,
|
||||
2..=30,
|
||||
"static_file_headers_2_30_cuckoo-fmph_zstd-dict",
|
||||
Some((
|
||||
Compression::ZstdWithDictionary,
|
||||
Filters::WithFilters(
|
||||
InclusionFilter::Cuckoo,
|
||||
crate::PerfectHashingFunction::Fmph,
|
||||
),
|
||||
)),
|
||||
"static_file_headers_2_30_none_zstd-dict",
|
||||
Some(Compression::ZstdWithDictionary),
|
||||
),
|
||||
];
|
||||
|
||||
for (segment, block_range, filename, configuration) in test_vectors {
|
||||
for (segment, block_range, filename, compression) in test_vectors {
|
||||
let block_range: SegmentRangeInclusive = block_range.into();
|
||||
if let Some((compression, filters)) = configuration {
|
||||
if let Some(compression) = compression {
|
||||
assert_eq!(
|
||||
segment.filename_with_configuration(filters, compression, &block_range,),
|
||||
segment.filename_with_configuration(compression, &block_range),
|
||||
filename
|
||||
);
|
||||
} else {
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
use super::mask::{ColumnSelectorOne, ColumnSelectorThree, ColumnSelectorTwo};
|
||||
use derive_more::{Deref, DerefMut};
|
||||
use reth_db_api::table::Decompress;
|
||||
use reth_nippy_jar::{DataReader, NippyJar, NippyJarCursor, NippyJarError};
|
||||
use reth_nippy_jar::{DataReader, NippyJar, NippyJarCursor};
|
||||
use reth_primitives::{static_file::SegmentHeader, B256};
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult};
|
||||
use std::sync::Arc;
|
||||
@ -39,7 +39,7 @@ impl<'a> StaticFileCursor<'a> {
|
||||
}
|
||||
|
||||
let row = match key_or_num {
|
||||
KeyOrNumber::Key(_) => Err(NippyJarError::UnsupportedFilterQuery),
|
||||
KeyOrNumber::Key(_) => unimplemented!(),
|
||||
KeyOrNumber::Number(n) => match self.jar().user_header().start() {
|
||||
Some(offset) => {
|
||||
if offset > n {
|
||||
|
||||
@ -18,19 +18,10 @@ name = "reth_nippy_jar"
|
||||
# reth
|
||||
reth-fs-util.workspace = true
|
||||
|
||||
# filter
|
||||
cuckoofilter = { version = "0.5.0", features = [
|
||||
"serde_support",
|
||||
"serde_bytes",
|
||||
] }
|
||||
|
||||
# compression
|
||||
zstd = { workspace = true, features = ["experimental", "zdict_builder"] }
|
||||
lz4_flex = { version = "0.11", default-features = false }
|
||||
|
||||
# offsets
|
||||
sucds = "~0.8"
|
||||
|
||||
memmap2 = "0.9.4"
|
||||
bincode = "1.3"
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
|
||||
@ -24,16 +24,6 @@ pub enum NippyJarError {
|
||||
ColumnLenMismatch(usize, usize),
|
||||
#[error("unexpected missing value: row:col {0}:{1}")]
|
||||
UnexpectedMissingValue(u64, u64),
|
||||
#[error(transparent)]
|
||||
EthFilterError(#[from] cuckoofilter::CuckooError),
|
||||
#[error("nippy jar initialized without filter")]
|
||||
FilterMissing,
|
||||
#[error("filter has reached max capacity")]
|
||||
FilterMaxCapacity,
|
||||
#[error("cuckoo was not properly initialized after loaded")]
|
||||
FilterCuckooNotLoaded,
|
||||
#[error("nippy jar was built without an index")]
|
||||
UnsupportedFilterQuery,
|
||||
#[error("the size of an offset must be at most 8 bytes, got {offset_size}")]
|
||||
OffsetSizeTooBig {
|
||||
/// The read offset size in number of bytes.
|
||||
|
||||
@ -1,88 +0,0 @@
|
||||
use super::InclusionFilter;
|
||||
use crate::NippyJarError;
|
||||
use cuckoofilter::{CuckooFilter, ExportedCuckooFilter};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
|
||||
/// [CuckooFilter](https://www.cs.cmu.edu/~dga/papers/cuckoo-conext2014.pdf). It builds and provides an approximated set-membership filter to answer queries such as "Does this element belong to this set?". Has a theoretical 3% false positive rate.
|
||||
pub struct Cuckoo {
|
||||
/// Remaining number of elements that can be added.
|
||||
///
|
||||
/// This is necessary because the inner implementation will fail on adding an element past capacity, **but it will still add it and remove other**: [source](https://github.com/axiomhq/rust-cuckoofilter/tree/624da891bed1dd5d002c8fa92ce0dcd301975561#notes--todos)
|
||||
remaining: usize,
|
||||
|
||||
/// `CuckooFilter`.
|
||||
filter: CuckooFilter<DefaultHasher>, // TODO does it need an actual hasher?
|
||||
}
|
||||
|
||||
impl Cuckoo {
|
||||
pub fn new(max_capacity: usize) -> Self {
|
||||
// CuckooFilter might return `NotEnoughSpace` even if they are remaining elements, if it's
|
||||
// close to capacity. Therefore, we increase it.
|
||||
let max_capacity = max_capacity + 100 + max_capacity / 3;
|
||||
|
||||
Self { remaining: max_capacity, filter: CuckooFilter::with_capacity(max_capacity) }
|
||||
}
|
||||
}
|
||||
|
||||
impl InclusionFilter for Cuckoo {
|
||||
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> {
|
||||
if self.remaining == 0 {
|
||||
return Err(NippyJarError::FilterMaxCapacity)
|
||||
}
|
||||
|
||||
self.remaining -= 1;
|
||||
|
||||
Ok(self.filter.add(element)?)
|
||||
}
|
||||
|
||||
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError> {
|
||||
Ok(self.filter.contains(element))
|
||||
}
|
||||
|
||||
fn size(&self) -> usize {
|
||||
self.filter.memory_usage()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Cuckoo {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Cuckoo")
|
||||
.field("remaining", &self.remaining)
|
||||
.field("filter_size", &self.filter.memory_usage())
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl PartialEq for Cuckoo {
|
||||
fn eq(&self, _other: &Self) -> bool {
|
||||
self.remaining == _other.remaining && {
|
||||
let f1 = self.filter.export();
|
||||
let f2 = _other.filter.export();
|
||||
f1.length == f2.length && f1.values == f2.values
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for Cuckoo {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let (remaining, exported): (usize, ExportedCuckooFilter) =
|
||||
Deserialize::deserialize(deserializer)?;
|
||||
|
||||
Ok(Self { remaining, filter: exported.into() })
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for Cuckoo {
|
||||
/// Potentially expensive, but should be used only when creating the file.
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
(self.remaining, self.filter.export()).serialize(serializer)
|
||||
}
|
||||
}
|
||||
@ -1,48 +0,0 @@
|
||||
use crate::NippyJarError;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
mod cuckoo;
|
||||
pub use cuckoo::Cuckoo;
|
||||
|
||||
/// Membership filter set trait.
|
||||
pub trait InclusionFilter {
|
||||
/// Add element to the inclusion list.
|
||||
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError>;
|
||||
|
||||
/// Checks if the element belongs to the inclusion list. **There might be false positives.**
|
||||
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError>;
|
||||
|
||||
fn size(&self) -> usize;
|
||||
}
|
||||
|
||||
/// Enum with different [`InclusionFilter`] types.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[cfg_attr(test, derive(PartialEq))]
|
||||
pub enum InclusionFilters {
|
||||
Cuckoo(Cuckoo),
|
||||
// Avoids irrefutable let errors. Remove this after adding another one.
|
||||
Unused,
|
||||
}
|
||||
|
||||
impl InclusionFilter for InclusionFilters {
|
||||
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> {
|
||||
match self {
|
||||
Self::Cuckoo(c) => c.add(element),
|
||||
Self::Unused => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError> {
|
||||
match self {
|
||||
Self::Cuckoo(c) => c.contains(element),
|
||||
Self::Unused => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn size(&self) -> usize {
|
||||
match self {
|
||||
Self::Cuckoo(c) => c.size(),
|
||||
Self::Unused => 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -21,12 +21,8 @@ use std::{
|
||||
ops::Range,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use sucds::{int_vectors::PrefixSummedEliasFano, Serializable};
|
||||
use tracing::*;
|
||||
|
||||
pub mod filter;
|
||||
use filter::{Cuckoo, InclusionFilter, InclusionFilters};
|
||||
|
||||
pub mod compression;
|
||||
#[cfg(test)]
|
||||
use compression::Compression;
|
||||
@ -37,6 +33,11 @@ use compression::Compressors;
|
||||
#[cfg_attr(test, derive(PartialEq, Eq))]
|
||||
pub enum Functions {}
|
||||
|
||||
/// empty enum for backwards compatibility
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[cfg_attr(test, derive(PartialEq, Eq))]
|
||||
pub enum InclusionFilters {}
|
||||
|
||||
mod error;
|
||||
pub use error::NippyJarError;
|
||||
|
||||
@ -78,8 +79,6 @@ impl<T> NippyJarHeader for T where
|
||||
///
|
||||
/// Data is organized into a columnar format, enabling column-based compression. Data retrieval
|
||||
/// entails consulting an offset list and fetching the data from file via `mmap`.
|
||||
/// Ultimately, the `freeze` function yields two files: a data file containing both the data and its
|
||||
/// configuration, and an index file that houses the offsets and `offsets_index`.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[cfg_attr(test, derive(PartialEq))]
|
||||
pub struct NippyJar<H = ()> {
|
||||
@ -95,14 +94,11 @@ pub struct NippyJar<H = ()> {
|
||||
/// Optional compression algorithm applied to the data.
|
||||
compressor: Option<Compressors>,
|
||||
#[serde(skip)]
|
||||
/// Optional filter function for data membership checks.
|
||||
/// Optional field for backwards compatibility
|
||||
filter: Option<InclusionFilters>,
|
||||
#[serde(skip)]
|
||||
/// Optional field for backwards compatibility
|
||||
phf: Option<Functions>,
|
||||
/// Index mapping PHF output to value offsets in `offsets`.
|
||||
#[serde(skip)]
|
||||
offsets_index: PrefixSummedEliasFano,
|
||||
/// Maximum uncompressed row size of the set. This will enable decompression without any
|
||||
/// resizing of the output buffer.
|
||||
max_row_size: usize,
|
||||
@ -121,8 +117,6 @@ impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
|
||||
.field("compressor", &self.compressor)
|
||||
.field("filter", &self.filter)
|
||||
.field("phf", &self.phf)
|
||||
.field("offsets_index (len)", &self.offsets_index.len())
|
||||
.field("offsets_index (size in bytes)", &self.offsets_index.size_in_bytes())
|
||||
.field("path", &self.path)
|
||||
.field("max_row_size", &self.max_row_size)
|
||||
.finish_non_exhaustive()
|
||||
@ -139,11 +133,6 @@ impl NippyJar<()> {
|
||||
pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
|
||||
Self::load(path)
|
||||
}
|
||||
|
||||
/// Whether this [`NippyJar`] uses a [`InclusionFilters`] and [`Functions`].
|
||||
pub const fn uses_filters(&self) -> bool {
|
||||
self.filter.is_some() && self.phf.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
impl<H: NippyJarHeader> NippyJar<H> {
|
||||
@ -158,7 +147,6 @@ impl<H: NippyJarHeader> NippyJar<H> {
|
||||
compressor: None,
|
||||
filter: None,
|
||||
phf: None,
|
||||
offsets_index: PrefixSummedEliasFano::default(),
|
||||
path: path.to_path_buf(),
|
||||
}
|
||||
}
|
||||
@ -176,12 +164,6 @@ impl<H: NippyJarHeader> NippyJar<H> {
|
||||
self
|
||||
}
|
||||
|
||||
/// Adds [`filter::Cuckoo`] filter.
|
||||
pub fn with_cuckoo_filter(mut self, max_capacity: usize) -> Self {
|
||||
self.filter = Some(InclusionFilters::Cuckoo(Cuckoo::new(max_capacity)));
|
||||
self
|
||||
}
|
||||
|
||||
/// Gets a reference to the user header.
|
||||
pub const fn user_header(&self) -> &H {
|
||||
&self.user_header
|
||||
@ -197,16 +179,6 @@ impl<H: NippyJarHeader> NippyJar<H> {
|
||||
self.rows
|
||||
}
|
||||
|
||||
/// Returns the size of inclusion filter
|
||||
pub fn filter_size(&self) -> usize {
|
||||
self.size()
|
||||
}
|
||||
|
||||
/// Returns the size of offsets index
|
||||
pub fn offsets_index_size(&self) -> usize {
|
||||
self.offsets_index.size_in_bytes()
|
||||
}
|
||||
|
||||
/// Gets a reference to the compressor.
|
||||
pub const fn compressor(&self) -> Option<&Compressors> {
|
||||
self.compressor.as_ref()
|
||||
@ -217,8 +189,7 @@ impl<H: NippyJarHeader> NippyJar<H> {
|
||||
self.compressor.as_mut()
|
||||
}
|
||||
|
||||
/// Loads the file configuration and returns [`Self`] without deserializing filters related
|
||||
/// structures or the offset list.
|
||||
/// Loads the file configuration and returns [`Self`].
|
||||
///
|
||||
/// **The user must ensure the header type matches the one used during the jar's creation.**
|
||||
pub fn load(path: &Path) -> Result<Self, NippyJarError> {
|
||||
@ -232,16 +203,6 @@ impl<H: NippyJarHeader> NippyJar<H> {
|
||||
Ok(obj)
|
||||
}
|
||||
|
||||
/// Loads filters into memory.
|
||||
pub fn load_filters(&mut self) -> Result<(), NippyJarError> {
|
||||
// Read the offsets lists located at the index file.
|
||||
let mut offsets_file = File::open(self.index_path())?;
|
||||
self.offsets_index = PrefixSummedEliasFano::deserialize_from(&mut offsets_file)?;
|
||||
self.phf = bincode::deserialize_from(&mut offsets_file)?;
|
||||
self.filter = bincode::deserialize_from(&mut offsets_file)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the path for the data file
|
||||
pub fn data_path(&self) -> &Path {
|
||||
self.path.as_ref()
|
||||
@ -306,20 +267,6 @@ impl<H: NippyJarHeader> NippyJar<H> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<H: NippyJarHeader> InclusionFilter for NippyJar<H> {
|
||||
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> {
|
||||
self.filter.as_mut().ok_or(NippyJarError::FilterMissing)?.add(element)
|
||||
}
|
||||
|
||||
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError> {
|
||||
self.filter.as_ref().ok_or(NippyJarError::FilterMissing)?.contains(element)
|
||||
}
|
||||
|
||||
fn size(&self) -> usize {
|
||||
self.filter.as_ref().map(|f| f.size()).unwrap_or(0)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<H: NippyJarHeader> NippyJar<H> {
|
||||
/// If required, prepares any compression algorithm to an early pass of the data.
|
||||
@ -345,9 +292,6 @@ impl<H: NippyJarHeader> NippyJar<H> {
|
||||
|
||||
debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
|
||||
|
||||
// Write phf, filter and offset index to file
|
||||
self.freeze_filters()?;
|
||||
|
||||
// Creates the writer, data and offsets file
|
||||
let mut writer = NippyJarWriter::new(self)?;
|
||||
|
||||
@ -362,18 +306,6 @@ impl<H: NippyJarHeader> NippyJar<H> {
|
||||
Ok(writer.into_jar())
|
||||
}
|
||||
|
||||
/// Freezes [`InclusionFilter`] and the offset index to file.
|
||||
fn freeze_filters(&self) -> Result<(), NippyJarError> {
|
||||
debug!(target: "nippy-jar", path=?self.index_path(), "Writing offsets and offsets index to file.");
|
||||
|
||||
let mut file = File::create(self.index_path())?;
|
||||
self.offsets_index.serialize_into(&mut file)?;
|
||||
bincode::serialize_into(&mut file, &self.phf)?;
|
||||
bincode::serialize_into(&mut file, &self.filter)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Safety checks before creating and returning a [`File`] handle to write data to.
|
||||
fn check_before_freeze(
|
||||
&self,
|
||||
@ -553,51 +485,6 @@ mod tests {
|
||||
assert_eq!(jar, read_jar);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_filter() {
|
||||
let (col1, col2) = test_data(Some(1));
|
||||
let num_columns = 2;
|
||||
let num_rows = col1.len() as u64;
|
||||
let file_path = tempfile::NamedTempFile::new().unwrap();
|
||||
|
||||
let mut nippy = NippyJar::new_without_header(num_columns, file_path.path());
|
||||
|
||||
assert!(matches!(
|
||||
InclusionFilter::add(&mut nippy, &col1[0]),
|
||||
Err(NippyJarError::FilterMissing)
|
||||
));
|
||||
|
||||
nippy = nippy.with_cuckoo_filter(4);
|
||||
|
||||
// Add col1[0]
|
||||
assert!(!InclusionFilter::contains(&nippy, &col1[0]).unwrap());
|
||||
assert!(InclusionFilter::add(&mut nippy, &col1[0]).is_ok());
|
||||
assert!(InclusionFilter::contains(&nippy, &col1[0]).unwrap());
|
||||
|
||||
// Add col1[1]
|
||||
assert!(!InclusionFilter::contains(&nippy, &col1[1]).unwrap());
|
||||
assert!(InclusionFilter::add(&mut nippy, &col1[1]).is_ok());
|
||||
assert!(InclusionFilter::contains(&nippy, &col1[1]).unwrap());
|
||||
|
||||
// // Add more columns until max_capacity
|
||||
assert!(InclusionFilter::add(&mut nippy, &col1[2]).is_ok());
|
||||
assert!(InclusionFilter::add(&mut nippy, &col1[3]).is_ok());
|
||||
|
||||
let nippy = nippy
|
||||
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
|
||||
.unwrap();
|
||||
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
|
||||
assert_eq!(nippy, loaded_nippy);
|
||||
|
||||
assert!(InclusionFilter::contains(&loaded_nippy, &col1[0]).unwrap());
|
||||
assert!(InclusionFilter::contains(&loaded_nippy, &col1[1]).unwrap());
|
||||
assert!(InclusionFilter::contains(&loaded_nippy, &col1[2]).unwrap());
|
||||
assert!(InclusionFilter::contains(&loaded_nippy, &col1[3]).unwrap());
|
||||
assert!(!InclusionFilter::contains(&loaded_nippy, &col1[4]).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_zstd_with_dictionaries() {
|
||||
let (col1, col2) = test_data(None);
|
||||
@ -646,13 +533,11 @@ mod tests {
|
||||
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
|
||||
.unwrap();
|
||||
|
||||
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
assert_eq!(nippy.version, loaded_nippy.version);
|
||||
assert_eq!(nippy.columns, loaded_nippy.columns);
|
||||
assert_eq!(nippy.filter, loaded_nippy.filter);
|
||||
assert_eq!(nippy.phf, loaded_nippy.phf);
|
||||
assert_eq!(nippy.offsets_index, loaded_nippy.offsets_index);
|
||||
assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
|
||||
assert_eq!(nippy.path, loaded_nippy.path);
|
||||
|
||||
@ -691,8 +576,7 @@ mod tests {
|
||||
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
|
||||
.unwrap();
|
||||
|
||||
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
assert_eq!(nippy, loaded_nippy);
|
||||
|
||||
if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
|
||||
@ -730,8 +614,7 @@ mod tests {
|
||||
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
|
||||
.unwrap();
|
||||
|
||||
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
assert_eq!(nippy, loaded_nippy);
|
||||
|
||||
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
|
||||
@ -753,7 +636,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
/// Tests `NippyJar` with everything enabled: compression, filter, offset list and offset index.
|
||||
/// Tests `NippyJar` with everything enabled.
|
||||
#[test]
|
||||
fn test_full_nippy_jar() {
|
||||
let (col1, col2) = test_data(None);
|
||||
@ -773,8 +656,7 @@ mod tests {
|
||||
{
|
||||
let mut nippy =
|
||||
NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
|
||||
.with_zstd(true, 5000)
|
||||
.with_cuckoo_filter(col1.len());
|
||||
.with_zstd(true, 5000);
|
||||
|
||||
nippy.prepare_compression(data.clone()).unwrap();
|
||||
nippy
|
||||
@ -784,11 +666,9 @@ mod tests {
|
||||
|
||||
// Read file
|
||||
{
|
||||
let mut loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
|
||||
|
||||
assert!(loaded_nippy.compressor().is_some());
|
||||
assert!(loaded_nippy.filter.is_some());
|
||||
assert_eq!(loaded_nippy.user_header().block_start, block_start);
|
||||
|
||||
if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
|
||||
@ -827,10 +707,8 @@ mod tests {
|
||||
|
||||
// Create file
|
||||
{
|
||||
let mut nippy = NippyJar::new_without_header(num_columns, file_path.path())
|
||||
.with_zstd(true, 5000)
|
||||
.with_cuckoo_filter(col1.len());
|
||||
|
||||
let mut nippy =
|
||||
NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
|
||||
nippy.prepare_compression(data).unwrap();
|
||||
nippy
|
||||
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
|
||||
@ -839,8 +717,7 @@ mod tests {
|
||||
|
||||
// Read file
|
||||
{
|
||||
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
|
||||
if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
|
||||
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
|
||||
|
||||
@ -112,9 +112,6 @@ pub struct StaticFileProviderInner {
|
||||
static_files_tx_index: RwLock<SegmentRanges>,
|
||||
/// Directory where `static_files` are located
|
||||
path: PathBuf,
|
||||
/// Whether [`StaticFileJarProvider`] loads filters into memory. If not, `by_hash` queries
|
||||
/// won't be able to be queried directly.
|
||||
load_filters: bool,
|
||||
/// Maintains a writer set of [`StaticFileSegment`].
|
||||
writers: StaticFileWriters,
|
||||
metrics: Option<Arc<StaticFileProviderMetrics>>,
|
||||
@ -139,7 +136,6 @@ impl StaticFileProviderInner {
|
||||
static_files_max_block: Default::default(),
|
||||
static_files_tx_index: Default::default(),
|
||||
path: path.as_ref().to_path_buf(),
|
||||
load_filters: false,
|
||||
metrics: None,
|
||||
access,
|
||||
_lock_file,
|
||||
@ -154,14 +150,6 @@ impl StaticFileProviderInner {
|
||||
}
|
||||
|
||||
impl StaticFileProvider {
|
||||
/// Loads filters into memory when creating a [`StaticFileJarProvider`].
|
||||
pub fn with_filters(self) -> Self {
|
||||
let mut provider =
|
||||
Arc::try_unwrap(self.0).expect("should be called when initializing only");
|
||||
provider.load_filters = true;
|
||||
Self(Arc::new(provider))
|
||||
}
|
||||
|
||||
/// Enables metrics on the [`StaticFileProvider`].
|
||||
pub fn with_metrics(self) -> Self {
|
||||
let mut provider =
|
||||
@ -298,14 +286,8 @@ impl StaticFileProvider {
|
||||
let jar = if let Some((_, jar)) = self.map.remove(&key) {
|
||||
jar.jar
|
||||
} else {
|
||||
let mut jar = NippyJar::<SegmentHeader>::load(
|
||||
&self.path.join(segment.filename(&fixed_block_range)),
|
||||
)
|
||||
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
|
||||
if self.load_filters {
|
||||
jar.load_filters().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
|
||||
}
|
||||
jar
|
||||
NippyJar::<SegmentHeader>::load(&self.path.join(segment.filename(&fixed_block_range)))
|
||||
.map_err(|e| ProviderError::NippyJar(e.to_string()))?
|
||||
};
|
||||
|
||||
jar.delete().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
|
||||
@ -337,12 +319,7 @@ impl StaticFileProvider {
|
||||
} else {
|
||||
trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
|
||||
let path = self.path.join(segment.filename(fixed_block_range));
|
||||
let mut jar =
|
||||
NippyJar::load(&path).map_err(|e| ProviderError::NippyJar(e.to_string()))?;
|
||||
if self.load_filters {
|
||||
jar.load_filters().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
|
||||
}
|
||||
|
||||
let jar = NippyJar::load(&path).map_err(|e| ProviderError::NippyJar(e.to_string()))?;
|
||||
self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user