chore: remove unused static-file code (#9178)

This commit is contained in:
joshieDo
2024-07-01 17:24:51 +02:00
committed by GitHub
parent cf8a9163af
commit 9d4722eb65
11 changed files with 95 additions and 498 deletions

View File

@ -1,14 +1,14 @@
use crate::segments::{dataset_for_compression, prepare_jar, Segment, SegmentHeader}; use crate::segments::Segment;
use alloy_primitives::BlockNumber; use alloy_primitives::BlockNumber;
use reth_db::{static_file::create_static_file_T1_T2_T3, tables, RawKey, RawTable}; use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx}; use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_provider::{ use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter}, providers::{StaticFileProvider, StaticFileWriter},
DatabaseProviderRO, DatabaseProviderRO,
}; };
use reth_static_file_types::{SegmentConfig, StaticFileSegment}; use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::ProviderResult; use reth_storage_errors::provider::ProviderResult;
use std::{ops::RangeInclusive, path::Path}; use std::ops::RangeInclusive;
/// Static File segment responsible for [`StaticFileSegment::Headers`] part of data. /// Static File segment responsible for [`StaticFileSegment::Headers`] part of data.
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -56,73 +56,4 @@ impl<DB: Database> Segment<DB> for Headers {
Ok(()) Ok(())
} }
fn create_static_file_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let range_len = block_range.clone().count();
let jar = prepare_jar::<DB, 3>(
provider,
directory,
StaticFileSegment::Headers,
config,
block_range.clone(),
range_len,
|| {
Ok([
dataset_for_compression::<DB, tables::Headers>(
provider,
&block_range,
range_len,
)?,
dataset_for_compression::<DB, tables::HeaderTerminalDifficulties>(
provider,
&block_range,
range_len,
)?,
dataset_for_compression::<DB, tables::CanonicalHeaders>(
provider,
&block_range,
range_len,
)?,
])
},
)?;
// Generate list of hashes for filters & PHF
let mut cursor = provider.tx_ref().cursor_read::<RawTable<tables::CanonicalHeaders>>()?;
let hashes = if config.filters.has_filters() {
Some(
cursor
.walk(Some(RawKey::from(*block_range.start())))?
.take(range_len)
.map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())),
)
} else {
None
};
create_static_file_T1_T2_T3::<
tables::Headers,
tables::HeaderTerminalDifficulties,
tables::CanonicalHeaders,
BlockNumber,
SegmentHeader,
>(
provider.tx_ref(),
block_range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
range_len,
jar,
)?;
Ok(())
}
} }

View File

@ -10,20 +10,11 @@ mod receipts;
pub use receipts::Receipts; pub use receipts::Receipts;
use alloy_primitives::BlockNumber; use alloy_primitives::BlockNumber;
use reth_db::{RawKey, RawTable}; use reth_db_api::database::Database;
use reth_db_api::{cursor::DbCursorRO, database::Database, table::Table, transaction::DbTx}; use reth_provider::{providers::StaticFileProvider, DatabaseProviderRO};
use reth_nippy_jar::NippyJar; use reth_static_file_types::StaticFileSegment;
use reth_provider::{
providers::StaticFileProvider, DatabaseProviderRO, ProviderError, TransactionsProviderExt,
};
use reth_static_file_types::{
find_fixed_range, Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentConfig,
SegmentHeader, StaticFileSegment,
};
use reth_storage_errors::provider::ProviderResult; use reth_storage_errors::provider::ProviderResult;
use std::{ops::RangeInclusive, path::Path}; use std::ops::RangeInclusive;
pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];
/// A segment represents moving some portion of the data to static files. /// A segment represents moving some portion of the data to static files.
pub trait Segment<DB: Database>: Send + Sync { pub trait Segment<DB: Database>: Send + Sync {
@ -38,80 +29,4 @@ pub trait Segment<DB: Database>: Send + Sync {
static_file_provider: StaticFileProvider, static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>, block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()>; ) -> ProviderResult<()>;
/// Create a static file of data for the provided block range. The `directory` parameter
/// determines the static file's save location.
fn create_static_file_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()>;
}
/// Returns a [`NippyJar`] according to the desired configuration. The `directory` parameter
/// determines the static file's save location.
pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
provider: &DatabaseProviderRO<DB>,
directory: impl AsRef<Path>,
segment: StaticFileSegment,
segment_config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
total_rows: usize,
prepare_compression: impl Fn() -> ProviderResult<Rows<COLUMNS>>,
) -> ProviderResult<NippyJar<SegmentHeader>> {
let tx_range = match segment {
StaticFileSegment::Headers => None,
StaticFileSegment::Receipts | StaticFileSegment::Transactions => {
Some(provider.transaction_range_by_block_range(block_range.clone())?.into())
}
};
let mut nippy_jar = NippyJar::new(
COLUMNS,
&directory.as_ref().join(segment.filename(&find_fixed_range(*block_range.end())).as_str()),
SegmentHeader::new(block_range.clone().into(), Some(block_range.into()), tx_range, segment),
);
nippy_jar = match segment_config.compression {
Compression::Lz4 => nippy_jar.with_lz4(),
Compression::Zstd => nippy_jar.with_zstd(false, 0),
Compression::ZstdWithDictionary => {
let dataset = prepare_compression()?;
nippy_jar = nippy_jar.with_zstd(true, 5_000_000);
nippy_jar
.prepare_compression(dataset.to_vec())
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
nippy_jar
}
Compression::Uncompressed => nippy_jar,
};
if let Filters::WithFilters(inclusion_filter, phf) = segment_config.filters {
nippy_jar = match inclusion_filter {
InclusionFilter::Cuckoo => nippy_jar.with_cuckoo_filter(total_rows),
};
nippy_jar = match phf {
PerfectHashingFunction::Fmph => nippy_jar.with_fmph(),
PerfectHashingFunction::GoFmph => nippy_jar.with_gofmph(),
};
}
Ok(nippy_jar)
}
/// Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000).
pub(crate) fn dataset_for_compression<DB: Database, T: Table<Key = u64>>(
provider: &DatabaseProviderRO<DB>,
range: &RangeInclusive<u64>,
range_len: usize,
) -> ProviderResult<Vec<Vec<u8>>> {
let mut cursor = provider.tx_ref().cursor_read::<RawTable<T>>()?;
Ok(cursor
.walk_back(Some(RawKey::from(*range.end())))?
.take(range_len.min(1000))
.map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist"))
.collect::<Vec<_>>())
} }

View File

@ -1,14 +1,14 @@
use crate::segments::{dataset_for_compression, prepare_jar, Segment}; use crate::segments::Segment;
use alloy_primitives::{BlockNumber, TxNumber}; use alloy_primitives::BlockNumber;
use reth_db::{static_file::create_static_file_T1, tables}; use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx}; use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_provider::{ use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter}, providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DatabaseProviderRO, TransactionsProviderExt, BlockReader, DatabaseProviderRO,
}; };
use reth_static_file_types::{SegmentConfig, SegmentHeader, StaticFileSegment}; use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::{ProviderError, ProviderResult}; use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{ops::RangeInclusive, path::Path}; use std::ops::RangeInclusive;
/// Static File segment responsible for [`StaticFileSegment::Receipts`] part of data. /// Static File segment responsible for [`StaticFileSegment::Receipts`] part of data.
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -47,56 +47,4 @@ impl<DB: Database> Segment<DB> for Receipts {
Ok(()) Ok(())
} }
fn create_static_file_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let tx_range_len = tx_range.clone().count();
let jar = prepare_jar::<DB, 1>(
provider,
directory,
StaticFileSegment::Receipts,
config,
block_range,
tx_range_len,
|| {
Ok([dataset_for_compression::<DB, tables::Receipts>(
provider,
&tx_range,
tx_range_len,
)?])
},
)?;
// Generate list of hashes for filters & PHF
let hashes = if config.filters.has_filters() {
Some(
provider
.transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))?
.into_iter()
.map(|(tx, _)| Ok(tx)),
)
} else {
None
};
create_static_file_T1::<tables::Receipts, TxNumber, SegmentHeader>(
provider.tx_ref(),
tx_range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
tx_range_len,
jar,
)?;
Ok(())
}
} }

View File

@ -1,14 +1,14 @@
use crate::segments::{dataset_for_compression, prepare_jar, Segment}; use crate::segments::Segment;
use alloy_primitives::{BlockNumber, TxNumber}; use alloy_primitives::BlockNumber;
use reth_db::{static_file::create_static_file_T1, tables}; use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx}; use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_provider::{ use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter}, providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DatabaseProviderRO, TransactionsProviderExt, BlockReader, DatabaseProviderRO,
}; };
use reth_static_file_types::{SegmentConfig, SegmentHeader, StaticFileSegment}; use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::{ProviderError, ProviderResult}; use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{ops::RangeInclusive, path::Path}; use std::ops::RangeInclusive;
/// Static File segment responsible for [`StaticFileSegment::Transactions`] part of data. /// Static File segment responsible for [`StaticFileSegment::Transactions`] part of data.
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -53,56 +53,4 @@ impl<DB: Database> Segment<DB> for Transactions {
Ok(()) Ok(())
} }
fn create_static_file_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let tx_range_len = tx_range.clone().count();
let jar = prepare_jar::<DB, 1>(
provider,
directory,
StaticFileSegment::Transactions,
config,
block_range,
tx_range_len,
|| {
Ok([dataset_for_compression::<DB, tables::Transactions>(
provider,
&tx_range,
tx_range_len,
)?])
},
)?;
// Generate list of hashes for filters & PHF
let hashes = if config.filters.has_filters() {
Some(
provider
.transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))?
.into_iter()
.map(|(tx, _)| Ok(tx)),
)
} else {
None
};
create_static_file_T1::<tables::Transactions, TxNumber, SegmentHeader>(
provider.tx_ref(),
tx_range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
tx_range_len,
jar,
)?;
Ok(())
}
} }

View File

@ -1,115 +0,0 @@
use crate::{RawKey, RawTable};
use reth_db_api::{
cursor::DbCursorRO,
table::{Key, Table},
transaction::DbTx,
};
use reth_nippy_jar::{ColumnResult, NippyJar, NippyJarHeader, PHFKey};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use reth_tracing::tracing::*;
use std::{error::Error as StdError, ops::RangeInclusive};
/// Macro that generates static file creation functions that take an arbitrary number of [`Table`]
/// and creates a [`NippyJar`] file out of their [`Table::Value`]. Each list of [`Table::Value`]
/// from a table is a column of values.
///
/// Has membership filter set and compression dictionary support.
macro_rules! generate_static_file_func {
($(($($tbl:ident),+)),+ $(,)? ) => {
$(
paste::item! {
/// Creates a static file from specified tables. Each table's `Value` iterator represents a column.
///
/// **Ensure the range contains the same number of rows.**
///
/// * `tx`: Database transaction.
/// * `range`: Data range for columns in tables.
/// * `additional`: Additional columns which can't be straight straightforwardly walked on.
/// * `keys`: IntoIterator of keys (eg. `TxHash` or `BlockHash`) with length equal to `row_count` and ordered by future column insertion from `range`.
/// * `dict_compression_set`: Sets of column data for compression dictionaries. Max size is 2GB. Row count is independent.
/// * `row_count`: Total rows to add to `NippyJar`. Must match row count in `range`.
/// * `nippy_jar`: Static File object responsible for file generation.
#[allow(non_snake_case)]
pub fn [<create_static_file$(_ $tbl)+>]<
$($tbl: Table<Key=K>,)+
K,
H: NippyJarHeader
>
(
tx: &impl DbTx,
range: RangeInclusive<K>,
additional: Option<Vec<Box<dyn Iterator<Item = Result<Vec<u8>, Box<dyn StdError + Send + Sync>>>>>>,
dict_compression_set: Option<Vec<impl Iterator<Item = Vec<u8>>>>,
keys: Option<impl Iterator<Item = ColumnResult<impl PHFKey>>>,
row_count: usize,
mut nippy_jar: NippyJar<H>
) -> ProviderResult<()>
where K: Key + Copy
{
let additional = additional.unwrap_or_default();
debug!(target: "reth::static_file", ?range, "Creating static file {:?} and {} more columns.", vec![$($tbl::NAME,)+], additional.len());
let range: RangeInclusive<RawKey<K>> = RawKey::new(*range.start())..=RawKey::new(*range.end());
// Create PHF and Filter if required
if let Some(keys) = keys {
debug!(target: "reth::static_file", "Calculating Filter, PHF and offset index list");
match nippy_jar.prepare_index(keys, row_count) {
Ok(_) => {
debug!(target: "reth::static_file", "Filter, PHF and offset index list calculated.");
},
Err(e) => {
return Err(ProviderError::NippyJar(e.to_string()));
}
}
}
// Create compression dictionaries if required
if let Some(data_sets) = dict_compression_set {
debug!(target: "reth::static_file", "Creating compression dictionaries.");
match nippy_jar.prepare_compression(data_sets){
Ok(_) => {
debug!(target: "reth::static_file", "Compression dictionaries created.");
},
Err(e) => {
return Err(ProviderError::NippyJar(e.to_string()));
}
}
}
// Creates the cursors for the columns
$(
let mut [< $tbl _cursor>] = tx.cursor_read::<RawTable<$tbl>>()?;
let [< $tbl _iter>] = [< $tbl _cursor>]
.walk_range(range.clone())?
.into_iter()
.map(|row|
row
.map(|(_key, val)| val.into_value())
.map_err(|e| Box::new(e) as Box<dyn StdError + Send + Sync>)
);
)+
// Create the static file from the data
let col_iterators: Vec<Box<dyn Iterator<Item = Result<Vec<u8>,_>>>> = vec![
$(Box::new([< $tbl _iter>]),)+
];
debug!(target: "reth::static_file", jar=?nippy_jar, "Generating static file.");
let nippy_jar = nippy_jar.freeze(col_iterators.into_iter().chain(additional).collect(), row_count as u64).map_err(|e| ProviderError::NippyJar(e.to_string()));
debug!(target: "reth::static_file", jar=?nippy_jar, "Static file generated.");
Ok(())
}
}
)+
};
}
generate_static_file_func!((T1), (T1, T2), (T1, T2, T3), (T1, T2, T3, T4), (T1, T2, T3, T4, T5),);

View File

@ -1,13 +1,10 @@
//! reth's static file database table import and access //! reth's static file database table import and access
mod generation;
use std::{ use std::{
collections::{hash_map::Entry, HashMap}, collections::{hash_map::Entry, HashMap},
path::Path, path::Path,
}; };
pub use generation::*;
mod cursor; mod cursor;
pub use cursor::StaticFileCursor; pub use cursor::StaticFileCursor;

View File

@ -30,6 +30,7 @@ pub trait Compression: Serialize + for<'a> Deserialize<'a> {
true true
} }
#[cfg(test)]
/// If required, prepares compression algorithm with an early pass on the data. /// If required, prepares compression algorithm with an early pass on the data.
fn prepare_compression( fn prepare_compression(
&mut self, &mut self,
@ -95,6 +96,7 @@ impl Compression for Compressors {
} }
} }
#[cfg(test)]
fn prepare_compression( fn prepare_compression(
&mut self, &mut self,
columns: Vec<impl IntoIterator<Item = Vec<u8>>>, columns: Vec<impl IntoIterator<Item = Vec<u8>>>,

View File

@ -185,6 +185,7 @@ impl Compression for Zstd {
matches!(self.state, ZstdState::Ready) matches!(self.state, ZstdState::Ready)
} }
#[cfg(test)]
/// If using it with dictionaries, prepares a dictionary for each column. /// If using it with dictionaries, prepares a dictionary for each column.
fn prepare_compression( fn prepare_compression(
&mut self, &mut self,
@ -208,7 +209,6 @@ impl Compression for Zstd {
return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len())) return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
} }
// TODO: parallel calculation
let mut dictionaries = vec![]; let mut dictionaries = vec![];
for column in columns { for column in columns {
// ZSTD requires all training data to be continuous in memory, alongside the size of // ZSTD requires all training data to be continuous in memory, alongside the size of
@ -273,6 +273,7 @@ impl<'a> std::fmt::Debug for ZstdDictionaries<'a> {
} }
impl<'a> ZstdDictionaries<'a> { impl<'a> ZstdDictionaries<'a> {
#[cfg(test)]
/// Creates [`ZstdDictionaries`]. /// Creates [`ZstdDictionaries`].
pub(crate) fn new(raw: Vec<RawDictionary>) -> Self { pub(crate) fn new(raw: Vec<RawDictionary>) -> Self {
Self(raw.into_iter().map(ZstdDictionary::Raw).collect()) Self(raw.into_iter().map(ZstdDictionary::Raw).collect())
@ -315,6 +316,7 @@ impl<'a> ZstdDictionaries<'a> {
/// A Zstd dictionary. It's created and serialized with [`ZstdDictionary::Raw`], and deserialized as /// A Zstd dictionary. It's created and serialized with [`ZstdDictionary::Raw`], and deserialized as
/// [`ZstdDictionary::Loaded`]. /// [`ZstdDictionary::Loaded`].
pub(crate) enum ZstdDictionary<'a> { pub(crate) enum ZstdDictionary<'a> {
#[allow(dead_code)]
Raw(RawDictionary), Raw(RawDictionary),
Loaded(DecoderDictionary<'a>), Loaded(DecoderDictionary<'a>),
} }

View File

@ -67,7 +67,7 @@ impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> {
self.row = 0; self.row = 0;
} }
/// Returns a row, searching it by a key used during [`NippyJar::prepare_index`]. /// Returns a row, searching it by a key.
/// ///
/// **May return false positives.** /// **May return false positives.**
/// ///
@ -130,7 +130,7 @@ impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> {
)) ))
} }
/// Returns a row, searching it by a key used during [`NippyJar::prepare_index`] by using a /// Returns a row, searching it by a key using a
/// `mask` to only read certain columns from the row. /// `mask` to only read certain columns from the row.
/// ///
/// **May return false positives.** /// **May return false positives.**

View File

@ -28,7 +28,9 @@ pub mod filter;
use filter::{Cuckoo, InclusionFilter, InclusionFilters}; use filter::{Cuckoo, InclusionFilter, InclusionFilters};
pub mod compression; pub mod compression;
use compression::{Compression, Compressors}; #[cfg(test)]
use compression::Compression;
use compression::Compressors;
pub mod phf; pub mod phf;
pub use phf::PHFKey; pub use phf::PHFKey;
@ -306,6 +308,56 @@ impl<H: NippyJarHeader> NippyJar<H> {
DataReader::new(self.data_path()) DataReader::new(self.data_path())
} }
/// Writes all necessary configuration to file.
fn freeze_config(&self) -> Result<(), NippyJarError> {
// Atomic writes are hard: <https://github.com/paradigmxyz/reth/issues/8622>
let mut tmp_path = self.config_path();
tmp_path.set_extension(".tmp");
// Write to temporary file
let mut file = File::create(&tmp_path)?;
bincode::serialize_into(&mut file, &self)?;
// fsync() file
file.sync_all()?;
// Rename file, not move
reth_fs_util::rename(&tmp_path, self.config_path())?;
// fsync() dir
if let Some(parent) = tmp_path.parent() {
OpenOptions::new().read(true).open(parent)?.sync_all()?;
}
Ok(())
}
}
impl<H: NippyJarHeader> InclusionFilter for NippyJar<H> {
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> {
self.filter.as_mut().ok_or(NippyJarError::FilterMissing)?.add(element)
}
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError> {
self.filter.as_ref().ok_or(NippyJarError::FilterMissing)?.contains(element)
}
fn size(&self) -> usize {
self.filter.as_ref().map(|f| f.size()).unwrap_or(0)
}
}
impl<H: NippyJarHeader> PerfectHashingFunction for NippyJar<H> {
fn set_keys<T: PHFKey>(&mut self, keys: &[T]) -> Result<(), NippyJarError> {
self.phf.as_mut().ok_or(NippyJarError::PHFMissing)?.set_keys(keys)
}
fn get_index(&self, key: &[u8]) -> Result<Option<u64>, NippyJarError> {
self.phf.as_ref().ok_or(NippyJarError::PHFMissing)?.get_index(key)
}
}
#[cfg(test)]
impl<H: NippyJarHeader> NippyJar<H> {
/// If required, prepares any compression algorithm to an early pass of the data. /// If required, prepares any compression algorithm to an early pass of the data.
pub fn prepare_compression( pub fn prepare_compression(
&mut self, &mut self,
@ -429,53 +481,6 @@ impl<H: NippyJarHeader> NippyJar<H> {
Ok(()) Ok(())
} }
/// Writes all necessary configuration to file.
fn freeze_config(&self) -> Result<(), NippyJarError> {
// Atomic writes are hard: <https://github.com/paradigmxyz/reth/issues/8622>
let mut tmp_path = self.config_path();
tmp_path.set_extension(".tmp");
// Write to temporary file
let mut file = File::create(&tmp_path)?;
bincode::serialize_into(&mut file, &self)?;
// fsync() file
file.sync_all()?;
// Rename file, not move
reth_fs_util::rename(&tmp_path, self.config_path())?;
// fsync() dir
if let Some(parent) = tmp_path.parent() {
OpenOptions::new().read(true).open(parent)?.sync_all()?;
}
Ok(())
}
}
impl<H: NippyJarHeader> InclusionFilter for NippyJar<H> {
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> {
self.filter.as_mut().ok_or(NippyJarError::FilterMissing)?.add(element)
}
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError> {
self.filter.as_ref().ok_or(NippyJarError::FilterMissing)?.contains(element)
}
fn size(&self) -> usize {
self.filter.as_ref().map(|f| f.size()).unwrap_or(0)
}
}
impl<H: NippyJarHeader> PerfectHashingFunction for NippyJar<H> {
fn set_keys<T: PHFKey>(&mut self, keys: &[T]) -> Result<(), NippyJarError> {
self.phf.as_mut().ok_or(NippyJarError::PHFMissing)?.set_keys(keys)
}
fn get_index(&self, key: &[u8]) -> Result<Option<u64>, NippyJarError> {
self.phf.as_ref().ok_or(NippyJarError::PHFMissing)?.get_index(key)
}
} }
/// Manages the reading of static file data using memory-mapped files. /// Manages the reading of static file data using memory-mapped files.
@ -581,6 +586,7 @@ impl DataReader {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use compression::Compression;
use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng}; use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
use std::{collections::HashSet, fs::OpenOptions}; use std::{collections::HashSet, fs::OpenOptions};

View File

@ -59,29 +59,16 @@ mod tests {
use super::*; use super::*;
use crate::{test_utils::create_test_provider_factory, HeaderProvider}; use crate::{test_utils::create_test_provider_factory, HeaderProvider};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use reth_db::{ use reth_db::{CanonicalHeaders, HeaderNumbers, HeaderTerminalDifficulties, Headers};
static_file::create_static_file_T1_T2_T3, CanonicalHeaders, HeaderNumbers, use reth_db_api::transaction::DbTxMut;
HeaderTerminalDifficulties, Headers, RawTable, use reth_primitives::{static_file::find_fixed_range, B256, U256};
};
use reth_db_api::{
cursor::DbCursorRO,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{static_file::find_fixed_range, BlockNumber, B256, U256};
use reth_testing_utils::generators::{self, random_header_range}; use reth_testing_utils::generators::{self, random_header_range};
use std::vec::IntoIter;
#[test] #[test]
fn test_snap() { fn test_snap() {
// Ranges // Ranges
let row_count = 100u64; let row_count = 100u64;
let range = 0..=(row_count - 1); let range = 0..=(row_count - 1);
let segment_header = SegmentHeader::new(
range.clone().into(),
Some(range.clone().into()),
Some(range.clone().into()),
StaticFileSegment::Headers,
);
// Data sources // Data sources
let factory = create_test_provider_factory(); let factory = create_test_provider_factory();
@ -113,46 +100,22 @@ mod tests {
// Create StaticFile // Create StaticFile
{ {
let with_compression = true; let manager = StaticFileProvider::read_write(static_files_path.path()).unwrap();
let with_filter = true; let mut writer = manager.latest_writer(StaticFileSegment::Headers).unwrap();
let mut td = U256::ZERO;
let mut nippy_jar = NippyJar::new(3, static_file.as_path(), segment_header); for header in headers.clone() {
td += header.header().difficulty;
if with_compression { let hash = header.hash();
nippy_jar = nippy_jar.with_zstd(false, 0); writer.append_header(header.unseal(), td, hash).unwrap();
} }
writer.commit().unwrap();
if with_filter {
nippy_jar = nippy_jar.with_cuckoo_filter(row_count as usize + 10).with_fmph();
}
let provider = factory.provider().unwrap();
let tx = provider.tx_ref();
let none_vec: Option<Vec<IntoIter<Vec<u8>>>> = None;
// Generate list of hashes for filters & PHF
let mut cursor = tx.cursor_read::<RawTable<CanonicalHeaders>>().unwrap();
let hashes = cursor
.walk(None)
.unwrap()
.map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into()));
create_static_file_T1_T2_T3::<
Headers,
HeaderTerminalDifficulties,
CanonicalHeaders,
BlockNumber,
SegmentHeader,
>(tx, range, None, none_vec, Some(hashes), row_count as usize, nippy_jar)
.unwrap();
} }
// Use providers to query Header data and compare if it matches // Use providers to query Header data and compare if it matches
{ {
let db_provider = factory.provider().unwrap(); let db_provider = factory.provider().unwrap();
let manager = let manager = StaticFileProvider::read_write(static_files_path.path()).unwrap();
StaticFileProvider::read_write(static_files_path.path()).unwrap().with_filters();
let jar_provider = manager let jar_provider = manager
.get_segment_provider_from_block(StaticFileSegment::Headers, 0, Some(&static_file)) .get_segment_provider_from_block(StaticFileSegment::Headers, 0, Some(&static_file))
.unwrap(); .unwrap();
@ -168,12 +131,12 @@ mod tests {
// Compare Header // Compare Header
assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap()); assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap());
assert_eq!(header, jar_provider.header(&header_hash).unwrap().unwrap()); assert_eq!(header, jar_provider.header_by_number(header.number).unwrap().unwrap());
// Compare HeaderTerminalDifficulties // Compare HeaderTerminalDifficulties
assert_eq!( assert_eq!(
db_provider.header_td(&header_hash).unwrap().unwrap(), db_provider.header_td(&header_hash).unwrap().unwrap(),
jar_provider.header_td(&header_hash).unwrap().unwrap() jar_provider.header_td_by_number(header.number).unwrap().unwrap()
); );
} }
} }