diff --git a/crates/static-file/static-file/src/segments/headers.rs b/crates/static-file/static-file/src/segments/headers.rs index e87c1fdc5..5824d1d1a 100644 --- a/crates/static-file/static-file/src/segments/headers.rs +++ b/crates/static-file/static-file/src/segments/headers.rs @@ -1,14 +1,14 @@ -use crate::segments::{dataset_for_compression, prepare_jar, Segment, SegmentHeader}; +use crate::segments::Segment; use alloy_primitives::BlockNumber; -use reth_db::{static_file::create_static_file_T1_T2_T3, tables, RawKey, RawTable}; +use reth_db::tables; use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx}; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, DatabaseProviderRO, }; -use reth_static_file_types::{SegmentConfig, StaticFileSegment}; +use reth_static_file_types::StaticFileSegment; use reth_storage_errors::provider::ProviderResult; -use std::{ops::RangeInclusive, path::Path}; +use std::ops::RangeInclusive; /// Static File segment responsible for [`StaticFileSegment::Headers`] part of data. #[derive(Debug, Default)] @@ -56,73 +56,4 @@ impl Segment for Headers { Ok(()) } - - fn create_static_file_file( - &self, - provider: &DatabaseProviderRO, - directory: &Path, - config: SegmentConfig, - block_range: RangeInclusive, - ) -> ProviderResult<()> { - let range_len = block_range.clone().count(); - let jar = prepare_jar::( - provider, - directory, - StaticFileSegment::Headers, - config, - block_range.clone(), - range_len, - || { - Ok([ - dataset_for_compression::( - provider, - &block_range, - range_len, - )?, - dataset_for_compression::( - provider, - &block_range, - range_len, - )?, - dataset_for_compression::( - provider, - &block_range, - range_len, - )?, - ]) - }, - )?; - - // Generate list of hashes for filters & PHF - let mut cursor = provider.tx_ref().cursor_read::>()?; - let hashes = if config.filters.has_filters() { - Some( - cursor - .walk(Some(RawKey::from(*block_range.start())))? - .take(range_len) - .map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())), - ) - } else { - None - }; - - create_static_file_T1_T2_T3::< - tables::Headers, - tables::HeaderTerminalDifficulties, - tables::CanonicalHeaders, - BlockNumber, - SegmentHeader, - >( - provider.tx_ref(), - block_range, - None, - // We already prepared the dictionary beforehand - None::>>>, - hashes, - range_len, - jar, - )?; - - Ok(()) - } } diff --git a/crates/static-file/static-file/src/segments/mod.rs b/crates/static-file/static-file/src/segments/mod.rs index 77798dd08..1125b2085 100644 --- a/crates/static-file/static-file/src/segments/mod.rs +++ b/crates/static-file/static-file/src/segments/mod.rs @@ -10,20 +10,11 @@ mod receipts; pub use receipts::Receipts; use alloy_primitives::BlockNumber; -use reth_db::{RawKey, RawTable}; -use reth_db_api::{cursor::DbCursorRO, database::Database, table::Table, transaction::DbTx}; -use reth_nippy_jar::NippyJar; -use reth_provider::{ - providers::StaticFileProvider, DatabaseProviderRO, ProviderError, TransactionsProviderExt, -}; -use reth_static_file_types::{ - find_fixed_range, Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentConfig, - SegmentHeader, StaticFileSegment, -}; +use reth_db_api::database::Database; +use reth_provider::{providers::StaticFileProvider, DatabaseProviderRO}; +use reth_static_file_types::StaticFileSegment; use reth_storage_errors::provider::ProviderResult; -use std::{ops::RangeInclusive, path::Path}; - -pub(crate) type Rows = [Vec>; COLUMNS]; +use std::ops::RangeInclusive; /// A segment represents moving some portion of the data to static files. pub trait Segment: Send + Sync { @@ -38,80 +29,4 @@ pub trait Segment: Send + Sync { static_file_provider: StaticFileProvider, block_range: RangeInclusive, ) -> ProviderResult<()>; - - /// Create a static file of data for the provided block range. The `directory` parameter - /// determines the static file's save location. - fn create_static_file_file( - &self, - provider: &DatabaseProviderRO, - directory: &Path, - config: SegmentConfig, - block_range: RangeInclusive, - ) -> ProviderResult<()>; -} - -/// Returns a [`NippyJar`] according to the desired configuration. The `directory` parameter -/// determines the static file's save location. -pub(crate) fn prepare_jar( - provider: &DatabaseProviderRO, - directory: impl AsRef, - segment: StaticFileSegment, - segment_config: SegmentConfig, - block_range: RangeInclusive, - total_rows: usize, - prepare_compression: impl Fn() -> ProviderResult>, -) -> ProviderResult> { - let tx_range = match segment { - StaticFileSegment::Headers => None, - StaticFileSegment::Receipts | StaticFileSegment::Transactions => { - Some(provider.transaction_range_by_block_range(block_range.clone())?.into()) - } - }; - - let mut nippy_jar = NippyJar::new( - COLUMNS, - &directory.as_ref().join(segment.filename(&find_fixed_range(*block_range.end())).as_str()), - SegmentHeader::new(block_range.clone().into(), Some(block_range.into()), tx_range, segment), - ); - - nippy_jar = match segment_config.compression { - Compression::Lz4 => nippy_jar.with_lz4(), - Compression::Zstd => nippy_jar.with_zstd(false, 0), - Compression::ZstdWithDictionary => { - let dataset = prepare_compression()?; - - nippy_jar = nippy_jar.with_zstd(true, 5_000_000); - nippy_jar - .prepare_compression(dataset.to_vec()) - .map_err(|e| ProviderError::NippyJar(e.to_string()))?; - nippy_jar - } - Compression::Uncompressed => nippy_jar, - }; - - if let Filters::WithFilters(inclusion_filter, phf) = segment_config.filters { - nippy_jar = match inclusion_filter { - InclusionFilter::Cuckoo => nippy_jar.with_cuckoo_filter(total_rows), - }; - nippy_jar = match phf { - PerfectHashingFunction::Fmph => nippy_jar.with_fmph(), - PerfectHashingFunction::GoFmph => nippy_jar.with_gofmph(), - }; - } - - Ok(nippy_jar) -} - -/// Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000). -pub(crate) fn dataset_for_compression>( - provider: &DatabaseProviderRO, - range: &RangeInclusive, - range_len: usize, -) -> ProviderResult>> { - let mut cursor = provider.tx_ref().cursor_read::>()?; - Ok(cursor - .walk_back(Some(RawKey::from(*range.end())))? - .take(range_len.min(1000)) - .map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist")) - .collect::>()) } diff --git a/crates/static-file/static-file/src/segments/receipts.rs b/crates/static-file/static-file/src/segments/receipts.rs index e09b5e690..5548e9f99 100644 --- a/crates/static-file/static-file/src/segments/receipts.rs +++ b/crates/static-file/static-file/src/segments/receipts.rs @@ -1,14 +1,14 @@ -use crate::segments::{dataset_for_compression, prepare_jar, Segment}; -use alloy_primitives::{BlockNumber, TxNumber}; -use reth_db::{static_file::create_static_file_T1, tables}; +use crate::segments::Segment; +use alloy_primitives::BlockNumber; +use reth_db::tables; use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx}; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, - BlockReader, DatabaseProviderRO, TransactionsProviderExt, + BlockReader, DatabaseProviderRO, }; -use reth_static_file_types::{SegmentConfig, SegmentHeader, StaticFileSegment}; +use reth_static_file_types::StaticFileSegment; use reth_storage_errors::provider::{ProviderError, ProviderResult}; -use std::{ops::RangeInclusive, path::Path}; +use std::ops::RangeInclusive; /// Static File segment responsible for [`StaticFileSegment::Receipts`] part of data. #[derive(Debug, Default)] @@ -47,56 +47,4 @@ impl Segment for Receipts { Ok(()) } - - fn create_static_file_file( - &self, - provider: &DatabaseProviderRO, - directory: &Path, - config: SegmentConfig, - block_range: RangeInclusive, - ) -> ProviderResult<()> { - let tx_range = provider.transaction_range_by_block_range(block_range.clone())?; - let tx_range_len = tx_range.clone().count(); - - let jar = prepare_jar::( - provider, - directory, - StaticFileSegment::Receipts, - config, - block_range, - tx_range_len, - || { - Ok([dataset_for_compression::( - provider, - &tx_range, - tx_range_len, - )?]) - }, - )?; - - // Generate list of hashes for filters & PHF - let hashes = if config.filters.has_filters() { - Some( - provider - .transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))? - .into_iter() - .map(|(tx, _)| Ok(tx)), - ) - } else { - None - }; - - create_static_file_T1::( - provider.tx_ref(), - tx_range, - None, - // We already prepared the dictionary beforehand - None::>>>, - hashes, - tx_range_len, - jar, - )?; - - Ok(()) - } } diff --git a/crates/static-file/static-file/src/segments/transactions.rs b/crates/static-file/static-file/src/segments/transactions.rs index c7daeba06..4361f8ca6 100644 --- a/crates/static-file/static-file/src/segments/transactions.rs +++ b/crates/static-file/static-file/src/segments/transactions.rs @@ -1,14 +1,14 @@ -use crate::segments::{dataset_for_compression, prepare_jar, Segment}; -use alloy_primitives::{BlockNumber, TxNumber}; -use reth_db::{static_file::create_static_file_T1, tables}; +use crate::segments::Segment; +use alloy_primitives::BlockNumber; +use reth_db::tables; use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx}; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, - BlockReader, DatabaseProviderRO, TransactionsProviderExt, + BlockReader, DatabaseProviderRO, }; -use reth_static_file_types::{SegmentConfig, SegmentHeader, StaticFileSegment}; +use reth_static_file_types::StaticFileSegment; use reth_storage_errors::provider::{ProviderError, ProviderResult}; -use std::{ops::RangeInclusive, path::Path}; +use std::ops::RangeInclusive; /// Static File segment responsible for [`StaticFileSegment::Transactions`] part of data. #[derive(Debug, Default)] @@ -53,56 +53,4 @@ impl Segment for Transactions { Ok(()) } - - fn create_static_file_file( - &self, - provider: &DatabaseProviderRO, - directory: &Path, - config: SegmentConfig, - block_range: RangeInclusive, - ) -> ProviderResult<()> { - let tx_range = provider.transaction_range_by_block_range(block_range.clone())?; - let tx_range_len = tx_range.clone().count(); - - let jar = prepare_jar::( - provider, - directory, - StaticFileSegment::Transactions, - config, - block_range, - tx_range_len, - || { - Ok([dataset_for_compression::( - provider, - &tx_range, - tx_range_len, - )?]) - }, - )?; - - // Generate list of hashes for filters & PHF - let hashes = if config.filters.has_filters() { - Some( - provider - .transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))? - .into_iter() - .map(|(tx, _)| Ok(tx)), - ) - } else { - None - }; - - create_static_file_T1::( - provider.tx_ref(), - tx_range, - None, - // We already prepared the dictionary beforehand - None::>>>, - hashes, - tx_range_len, - jar, - )?; - - Ok(()) - } } diff --git a/crates/storage/db/src/static_file/generation.rs b/crates/storage/db/src/static_file/generation.rs deleted file mode 100644 index 9c2a64a23..000000000 --- a/crates/storage/db/src/static_file/generation.rs +++ /dev/null @@ -1,115 +0,0 @@ -use crate::{RawKey, RawTable}; -use reth_db_api::{ - cursor::DbCursorRO, - table::{Key, Table}, - transaction::DbTx, -}; - -use reth_nippy_jar::{ColumnResult, NippyJar, NippyJarHeader, PHFKey}; -use reth_storage_errors::provider::{ProviderError, ProviderResult}; -use reth_tracing::tracing::*; -use std::{error::Error as StdError, ops::RangeInclusive}; - -/// Macro that generates static file creation functions that take an arbitrary number of [`Table`] -/// and creates a [`NippyJar`] file out of their [`Table::Value`]. Each list of [`Table::Value`] -/// from a table is a column of values. -/// -/// Has membership filter set and compression dictionary support. -macro_rules! generate_static_file_func { - ($(($($tbl:ident),+)),+ $(,)? ) => { - $( - paste::item! { - /// Creates a static file from specified tables. Each table's `Value` iterator represents a column. - /// - /// **Ensure the range contains the same number of rows.** - /// - /// * `tx`: Database transaction. - /// * `range`: Data range for columns in tables. - /// * `additional`: Additional columns which can't be straight straightforwardly walked on. - /// * `keys`: IntoIterator of keys (eg. `TxHash` or `BlockHash`) with length equal to `row_count` and ordered by future column insertion from `range`. - /// * `dict_compression_set`: Sets of column data for compression dictionaries. Max size is 2GB. Row count is independent. - /// * `row_count`: Total rows to add to `NippyJar`. Must match row count in `range`. - /// * `nippy_jar`: Static File object responsible for file generation. - #[allow(non_snake_case)] - pub fn []< - $($tbl: Table,)+ - K, - H: NippyJarHeader - > - ( - tx: &impl DbTx, - range: RangeInclusive, - additional: Option, Box>>>>>, - dict_compression_set: Option>>>, - keys: Option>>, - row_count: usize, - mut nippy_jar: NippyJar - ) -> ProviderResult<()> - where K: Key + Copy - { - let additional = additional.unwrap_or_default(); - debug!(target: "reth::static_file", ?range, "Creating static file {:?} and {} more columns.", vec![$($tbl::NAME,)+], additional.len()); - - let range: RangeInclusive> = RawKey::new(*range.start())..=RawKey::new(*range.end()); - - // Create PHF and Filter if required - if let Some(keys) = keys { - debug!(target: "reth::static_file", "Calculating Filter, PHF and offset index list"); - match nippy_jar.prepare_index(keys, row_count) { - Ok(_) => { - debug!(target: "reth::static_file", "Filter, PHF and offset index list calculated."); - }, - Err(e) => { - return Err(ProviderError::NippyJar(e.to_string())); - } - } - } - - // Create compression dictionaries if required - if let Some(data_sets) = dict_compression_set { - debug!(target: "reth::static_file", "Creating compression dictionaries."); - match nippy_jar.prepare_compression(data_sets){ - Ok(_) => { - debug!(target: "reth::static_file", "Compression dictionaries created."); - }, - Err(e) => { - return Err(ProviderError::NippyJar(e.to_string())); - } - } - - } - - // Creates the cursors for the columns - $( - let mut [< $tbl _cursor>] = tx.cursor_read::>()?; - let [< $tbl _iter>] = [< $tbl _cursor>] - .walk_range(range.clone())? - .into_iter() - .map(|row| - row - .map(|(_key, val)| val.into_value()) - .map_err(|e| Box::new(e) as Box) - ); - - )+ - - // Create the static file from the data - let col_iterators: Vec,_>>>> = vec![ - $(Box::new([< $tbl _iter>]),)+ - ]; - - - debug!(target: "reth::static_file", jar=?nippy_jar, "Generating static file."); - - let nippy_jar = nippy_jar.freeze(col_iterators.into_iter().chain(additional).collect(), row_count as u64).map_err(|e| ProviderError::NippyJar(e.to_string())); - - debug!(target: "reth::static_file", jar=?nippy_jar, "Static file generated."); - - Ok(()) - } - } - )+ - }; -} - -generate_static_file_func!((T1), (T1, T2), (T1, T2, T3), (T1, T2, T3, T4), (T1, T2, T3, T4, T5),); diff --git a/crates/storage/db/src/static_file/mod.rs b/crates/storage/db/src/static_file/mod.rs index daa6f8a81..f27a574f6 100644 --- a/crates/storage/db/src/static_file/mod.rs +++ b/crates/storage/db/src/static_file/mod.rs @@ -1,13 +1,10 @@ //! reth's static file database table import and access -mod generation; use std::{ collections::{hash_map::Entry, HashMap}, path::Path, }; -pub use generation::*; - mod cursor; pub use cursor::StaticFileCursor; diff --git a/crates/storage/nippy-jar/src/compression/mod.rs b/crates/storage/nippy-jar/src/compression/mod.rs index 76e8c6d16..28a92fe90 100644 --- a/crates/storage/nippy-jar/src/compression/mod.rs +++ b/crates/storage/nippy-jar/src/compression/mod.rs @@ -30,6 +30,7 @@ pub trait Compression: Serialize + for<'a> Deserialize<'a> { true } + #[cfg(test)] /// If required, prepares compression algorithm with an early pass on the data. fn prepare_compression( &mut self, @@ -95,6 +96,7 @@ impl Compression for Compressors { } } + #[cfg(test)] fn prepare_compression( &mut self, columns: Vec>>, diff --git a/crates/storage/nippy-jar/src/compression/zstd.rs b/crates/storage/nippy-jar/src/compression/zstd.rs index c55ca103a..494d79de5 100644 --- a/crates/storage/nippy-jar/src/compression/zstd.rs +++ b/crates/storage/nippy-jar/src/compression/zstd.rs @@ -185,6 +185,7 @@ impl Compression for Zstd { matches!(self.state, ZstdState::Ready) } + #[cfg(test)] /// If using it with dictionaries, prepares a dictionary for each column. fn prepare_compression( &mut self, @@ -208,7 +209,6 @@ impl Compression for Zstd { return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len())) } - // TODO: parallel calculation let mut dictionaries = vec![]; for column in columns { // ZSTD requires all training data to be continuous in memory, alongside the size of @@ -273,6 +273,7 @@ impl<'a> std::fmt::Debug for ZstdDictionaries<'a> { } impl<'a> ZstdDictionaries<'a> { + #[cfg(test)] /// Creates [`ZstdDictionaries`]. pub(crate) fn new(raw: Vec) -> Self { Self(raw.into_iter().map(ZstdDictionary::Raw).collect()) @@ -315,6 +316,7 @@ impl<'a> ZstdDictionaries<'a> { /// A Zstd dictionary. It's created and serialized with [`ZstdDictionary::Raw`], and deserialized as /// [`ZstdDictionary::Loaded`]. pub(crate) enum ZstdDictionary<'a> { + #[allow(dead_code)] Raw(RawDictionary), Loaded(DecoderDictionary<'a>), } diff --git a/crates/storage/nippy-jar/src/cursor.rs b/crates/storage/nippy-jar/src/cursor.rs index 434c40a9a..d42b0d364 100644 --- a/crates/storage/nippy-jar/src/cursor.rs +++ b/crates/storage/nippy-jar/src/cursor.rs @@ -67,7 +67,7 @@ impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> { self.row = 0; } - /// Returns a row, searching it by a key used during [`NippyJar::prepare_index`]. + /// Returns a row, searching it by a key. /// /// **May return false positives.** /// @@ -130,7 +130,7 @@ impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> { )) } - /// Returns a row, searching it by a key used during [`NippyJar::prepare_index`] by using a + /// Returns a row, searching it by a key using a /// `mask` to only read certain columns from the row. /// /// **May return false positives.** diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index 8247599d7..056f456eb 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -28,7 +28,9 @@ pub mod filter; use filter::{Cuckoo, InclusionFilter, InclusionFilters}; pub mod compression; -use compression::{Compression, Compressors}; +#[cfg(test)] +use compression::Compression; +use compression::Compressors; pub mod phf; pub use phf::PHFKey; @@ -306,6 +308,56 @@ impl NippyJar { DataReader::new(self.data_path()) } + /// Writes all necessary configuration to file. + fn freeze_config(&self) -> Result<(), NippyJarError> { + // Atomic writes are hard: + let mut tmp_path = self.config_path(); + tmp_path.set_extension(".tmp"); + + // Write to temporary file + let mut file = File::create(&tmp_path)?; + bincode::serialize_into(&mut file, &self)?; + + // fsync() file + file.sync_all()?; + + // Rename file, not move + reth_fs_util::rename(&tmp_path, self.config_path())?; + + // fsync() dir + if let Some(parent) = tmp_path.parent() { + OpenOptions::new().read(true).open(parent)?.sync_all()?; + } + Ok(()) + } +} + +impl InclusionFilter for NippyJar { + fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> { + self.filter.as_mut().ok_or(NippyJarError::FilterMissing)?.add(element) + } + + fn contains(&self, element: &[u8]) -> Result { + self.filter.as_ref().ok_or(NippyJarError::FilterMissing)?.contains(element) + } + + fn size(&self) -> usize { + self.filter.as_ref().map(|f| f.size()).unwrap_or(0) + } +} + +impl PerfectHashingFunction for NippyJar { + fn set_keys(&mut self, keys: &[T]) -> Result<(), NippyJarError> { + self.phf.as_mut().ok_or(NippyJarError::PHFMissing)?.set_keys(keys) + } + + fn get_index(&self, key: &[u8]) -> Result, NippyJarError> { + self.phf.as_ref().ok_or(NippyJarError::PHFMissing)?.get_index(key) + } +} + +#[cfg(test)] +impl NippyJar { /// If required, prepares any compression algorithm to an early pass of the data. pub fn prepare_compression( &mut self, @@ -429,53 +481,6 @@ impl NippyJar { Ok(()) } - - /// Writes all necessary configuration to file. - fn freeze_config(&self) -> Result<(), NippyJarError> { - // Atomic writes are hard: - let mut tmp_path = self.config_path(); - tmp_path.set_extension(".tmp"); - - // Write to temporary file - let mut file = File::create(&tmp_path)?; - bincode::serialize_into(&mut file, &self)?; - - // fsync() file - file.sync_all()?; - - // Rename file, not move - reth_fs_util::rename(&tmp_path, self.config_path())?; - - // fsync() dir - if let Some(parent) = tmp_path.parent() { - OpenOptions::new().read(true).open(parent)?.sync_all()?; - } - Ok(()) - } -} - -impl InclusionFilter for NippyJar { - fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> { - self.filter.as_mut().ok_or(NippyJarError::FilterMissing)?.add(element) - } - - fn contains(&self, element: &[u8]) -> Result { - self.filter.as_ref().ok_or(NippyJarError::FilterMissing)?.contains(element) - } - - fn size(&self) -> usize { - self.filter.as_ref().map(|f| f.size()).unwrap_or(0) - } -} - -impl PerfectHashingFunction for NippyJar { - fn set_keys(&mut self, keys: &[T]) -> Result<(), NippyJarError> { - self.phf.as_mut().ok_or(NippyJarError::PHFMissing)?.set_keys(keys) - } - - fn get_index(&self, key: &[u8]) -> Result, NippyJarError> { - self.phf.as_ref().ok_or(NippyJarError::PHFMissing)?.get_index(key) - } } /// Manages the reading of static file data using memory-mapped files. @@ -581,6 +586,7 @@ impl DataReader { #[cfg(test)] mod tests { use super::*; + use compression::Compression; use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng}; use std::{collections::HashSet, fs::OpenOptions}; diff --git a/crates/storage/provider/src/providers/static_file/mod.rs b/crates/storage/provider/src/providers/static_file/mod.rs index e7073defe..c5abdbe00 100644 --- a/crates/storage/provider/src/providers/static_file/mod.rs +++ b/crates/storage/provider/src/providers/static_file/mod.rs @@ -59,29 +59,16 @@ mod tests { use super::*; use crate::{test_utils::create_test_provider_factory, HeaderProvider}; use rand::seq::SliceRandom; - use reth_db::{ - static_file::create_static_file_T1_T2_T3, CanonicalHeaders, HeaderNumbers, - HeaderTerminalDifficulties, Headers, RawTable, - }; - use reth_db_api::{ - cursor::DbCursorRO, - transaction::{DbTx, DbTxMut}, - }; - use reth_primitives::{static_file::find_fixed_range, BlockNumber, B256, U256}; + use reth_db::{CanonicalHeaders, HeaderNumbers, HeaderTerminalDifficulties, Headers}; + use reth_db_api::transaction::DbTxMut; + use reth_primitives::{static_file::find_fixed_range, B256, U256}; use reth_testing_utils::generators::{self, random_header_range}; - use std::vec::IntoIter; #[test] fn test_snap() { // Ranges let row_count = 100u64; let range = 0..=(row_count - 1); - let segment_header = SegmentHeader::new( - range.clone().into(), - Some(range.clone().into()), - Some(range.clone().into()), - StaticFileSegment::Headers, - ); // Data sources let factory = create_test_provider_factory(); @@ -113,46 +100,22 @@ mod tests { // Create StaticFile { - let with_compression = true; - let with_filter = true; + let manager = StaticFileProvider::read_write(static_files_path.path()).unwrap(); + let mut writer = manager.latest_writer(StaticFileSegment::Headers).unwrap(); + let mut td = U256::ZERO; - let mut nippy_jar = NippyJar::new(3, static_file.as_path(), segment_header); - - if with_compression { - nippy_jar = nippy_jar.with_zstd(false, 0); + for header in headers.clone() { + td += header.header().difficulty; + let hash = header.hash(); + writer.append_header(header.unseal(), td, hash).unwrap(); } - - if with_filter { - nippy_jar = nippy_jar.with_cuckoo_filter(row_count as usize + 10).with_fmph(); - } - - let provider = factory.provider().unwrap(); - let tx = provider.tx_ref(); - - let none_vec: Option>>> = None; - - // Generate list of hashes for filters & PHF - let mut cursor = tx.cursor_read::>().unwrap(); - let hashes = cursor - .walk(None) - .unwrap() - .map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())); - - create_static_file_T1_T2_T3::< - Headers, - HeaderTerminalDifficulties, - CanonicalHeaders, - BlockNumber, - SegmentHeader, - >(tx, range, None, none_vec, Some(hashes), row_count as usize, nippy_jar) - .unwrap(); + writer.commit().unwrap(); } // Use providers to query Header data and compare if it matches { let db_provider = factory.provider().unwrap(); - let manager = - StaticFileProvider::read_write(static_files_path.path()).unwrap().with_filters(); + let manager = StaticFileProvider::read_write(static_files_path.path()).unwrap(); let jar_provider = manager .get_segment_provider_from_block(StaticFileSegment::Headers, 0, Some(&static_file)) .unwrap(); @@ -168,12 +131,12 @@ mod tests { // Compare Header assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap()); - assert_eq!(header, jar_provider.header(&header_hash).unwrap().unwrap()); + assert_eq!(header, jar_provider.header_by_number(header.number).unwrap().unwrap()); // Compare HeaderTerminalDifficulties assert_eq!( db_provider.header_td(&header_hash).unwrap().unwrap(), - jar_provider.header_td(&header_hash).unwrap().unwrap() + jar_provider.header_td_by_number(header.number).unwrap().unwrap() ); } }