rm nippyjar from reth-interfaces (#7081)

This commit is contained in:
back
2024-03-12 01:29:59 -07:00
committed by GitHub
parent 9707cb2b8d
commit 024bb264b7
9 changed files with 77 additions and 38 deletions

1
Cargo.lock generated
View File

@ -5926,7 +5926,6 @@ dependencies = [
"rand 0.8.5",
"reth-eth-wire",
"reth-network-api",
"reth-nippy-jar",
"reth-primitives",
"reth-rpc-types",
"secp256k1 0.27.0",

View File

@ -12,7 +12,6 @@ workspace = true
[dependencies]
reth-primitives.workspace = true
reth-nippy-jar.workspace = true
reth-rpc-types.workspace = true
reth-network-api.workspace = true
# TODO(onbjerg): We only need this for [BlockBody]

View File

@ -130,12 +130,6 @@ pub enum ProviderError {
BlockNumberOverflow(U256),
}
impl From<reth_nippy_jar::NippyJarError> for ProviderError {
fn from(err: reth_nippy_jar::NippyJarError) -> Self {
ProviderError::NippyJar(err.to_string())
}
}
impl From<reth_primitives::fs::FsPathError> for ProviderError {
fn from(err: reth_primitives::fs::FsPathError) -> Self {
ProviderError::FsPathError(err.to_string())

View File

@ -21,7 +21,9 @@ use reth_primitives::{
},
BlockNumber, StaticFileSegment,
};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderRO, TransactionsProviderExt};
use reth_provider::{
providers::StaticFileProvider, DatabaseProviderRO, ProviderError, TransactionsProviderExt,
};
use std::{ops::RangeInclusive, path::Path};
pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];
@ -82,7 +84,9 @@ pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
let dataset = prepare_compression()?;
nippy_jar = nippy_jar.with_zstd(true, 5_000_000);
nippy_jar.prepare_compression(dataset.to_vec())?;
nippy_jar
.prepare_compression(dataset.to_vec())
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
nippy_jar
}
Compression::Uncompressed => nippy_jar,

View File

@ -1,7 +1,7 @@
use super::mask::{ColumnSelectorOne, ColumnSelectorThree, ColumnSelectorTwo};
use crate::table::Decompress;
use derive_more::{Deref, DerefMut};
use reth_interfaces::provider::ProviderResult;
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_nippy_jar::{DataReader, NippyJar, NippyJarCursor};
use reth_primitives::{static_file::SegmentHeader, B256};
use std::sync::Arc;
@ -13,7 +13,10 @@ pub struct StaticFileCursor<'a>(NippyJarCursor<'a, SegmentHeader>);
impl<'a> StaticFileCursor<'a> {
/// Returns a new [`StaticFileCursor`].
pub fn new(jar: &'a NippyJar<SegmentHeader>, reader: Arc<DataReader>) -> ProviderResult<Self> {
Ok(Self(NippyJarCursor::with_reader(jar, reader)?))
Ok(Self(
NippyJarCursor::with_reader(jar, reader)
.map_err(|err| ProviderError::NippyJar(err.to_string()))?,
))
}
/// Returns the current `BlockNumber` or `TxNumber` of the cursor depending on the kind of
@ -43,7 +46,8 @@ impl<'a> StaticFileCursor<'a> {
}
None => Ok(None),
},
}?;
}
.map_or(None, |v| v);
Ok(row)
}

View File

@ -5,7 +5,7 @@ use crate::{
RawKey, RawTable,
};
use reth_interfaces::provider::ProviderResult;
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_nippy_jar::{ColumnResult, NippyJar, NippyJarHeader, PHFKey};
use reth_tracing::tracing::*;
use std::{error::Error as StdError, ops::RangeInclusive};
@ -55,15 +55,28 @@ macro_rules! generate_static_file_func {
// Create PHF and Filter if required
if let Some(keys) = keys {
debug!(target: "reth::static_file", "Calculating Filter, PHF and offset index list");
nippy_jar.prepare_index(keys, row_count)?;
match nippy_jar.prepare_index(keys, row_count) {
Ok(_) => {
debug!(target: "reth::static_file", "Filter, PHF and offset index list calculated.");
},
Err(e) => {
return Err(ProviderError::NippyJar(e.to_string()));
}
}
}
// Create compression dictionaries if required
if let Some(data_sets) = dict_compression_set {
debug!(target: "reth::static_file", "Creating compression dictionaries.");
nippy_jar.prepare_compression(data_sets)?;
match nippy_jar.prepare_compression(data_sets){
Ok(_) => {
debug!(target: "reth::static_file", "Compression dictionaries created.");
},
Err(e) => {
return Err(ProviderError::NippyJar(e.to_string()));
}
}
}
// Creates the cursors for the columns
@ -88,7 +101,7 @@ macro_rules! generate_static_file_func {
debug!(target: "reth::static_file", jar=?nippy_jar, "Generating static file.");
let nippy_jar = nippy_jar.freeze(col_iterators.into_iter().chain(additional).collect(), row_count as u64)?;
let nippy_jar = nippy_jar.freeze(col_iterators.into_iter().chain(additional).collect(), row_count as u64).map_err(|e| ProviderError::NippyJar(e.to_string()));
debug!(target: "reth::static_file", jar=?nippy_jar, "Static file generated.");

View File

@ -118,7 +118,8 @@ impl StaticFileProvider {
pub fn report_metrics(&self) -> ProviderResult<()> {
let Some(metrics) = &self.metrics else { return Ok(()) };
let static_files = iter_static_files(&self.path)?;
let static_files =
iter_static_files(&self.path).map_err(|e| ProviderError::NippyJar(e.to_string()))?;
for (segment, ranges) in static_files {
let mut entries = 0;
let mut size = 0;
@ -243,14 +244,15 @@ impl StaticFileProvider {
} else {
let mut jar = NippyJar::<SegmentHeader>::load(
&self.path.join(segment.filename(&fixed_block_range)),
)?;
)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
if self.load_filters {
jar.load_filters()?;
jar.load_filters().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
}
jar
};
jar.delete()?;
jar.delete().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
let mut segment_max_block = None;
if fixed_block_range.start() > 0 {
@ -276,9 +278,10 @@ impl StaticFileProvider {
jar.into()
} else {
let path = self.path.join(segment.filename(fixed_block_range));
let mut jar = NippyJar::load(&path)?;
let mut jar =
NippyJar::load(&path).map_err(|e| ProviderError::NippyJar(e.to_string()))?;
if self.load_filters {
jar.load_filters()?;
jar.load_filters().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
}
self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
@ -353,7 +356,8 @@ impl StaticFileProvider {
let jar = NippyJar::<SegmentHeader>::load(
&self.path.join(segment.filename(&fixed_range)),
)?;
)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
// Updates the tx index by first removing all entries which have a higher
// block_start than our current static file.
@ -416,7 +420,9 @@ impl StaticFileProvider {
tx_index.clear();
for (segment, ranges) in iter_static_files(&self.path)? {
for (segment, ranges) in
iter_static_files(&self.path).map_err(|e| ProviderError::NippyJar(e.to_string()))?
{
// Update last block for each segment
if let Some((block_range, _)) = ranges.last() {
max_block.insert(segment, block_range.end());

View File

@ -9,7 +9,7 @@ pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut};
mod metrics;
use reth_interfaces::provider::ProviderResult;
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_nippy_jar::NippyJar;
use reth_primitives::{static_file::SegmentHeader, StaticFileSegment};
use std::{ops::Deref, sync::Arc};
@ -28,9 +28,14 @@ pub struct LoadedJar {
impl LoadedJar {
fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
let mmap_handle = Arc::new(jar.open_data_reader()?);
match jar.open_data_reader() {
Ok(data_reader) => {
let mmap_handle = Arc::new(data_reader);
Ok(Self { jar, mmap_handle })
}
Err(e) => Err(ProviderError::NippyJar(e.to_string())),
}
}
/// Returns a clone of the mmap handle that can be used to instantiate a cursor.
fn mmap_handle(&self) -> Arc<reth_nippy_jar::DataReader> {

View File

@ -64,7 +64,11 @@ impl StaticFileProviderRW {
block_range.start(),
None,
) {
Ok(provider) => (NippyJar::load(provider.data_path())?, provider.data_path().into()),
Ok(provider) => (
NippyJar::load(provider.data_path())
.map_err(|e| ProviderError::NippyJar(e.to_string()))?,
provider.data_path().into(),
),
Err(ProviderError::MissingStaticFileBlock(_, _)) => {
let path = static_file_provider.directory().join(segment.filename(&block_range));
(create_jar(segment, &path, block_range), path)
@ -78,7 +82,7 @@ impl StaticFileProviderRW {
// This static file has been frozen, so we should
Err(ProviderError::FinalizedStaticFile(segment, block))
}
Err(e) => Err(e.into()),
Err(e) => Err(ProviderError::NippyJar(e.to_string())),
}?;
if let Some(metrics) = &metrics {
@ -97,7 +101,7 @@ impl StaticFileProviderRW {
let start = Instant::now();
// Commits offsets and new user_header to disk
self.writer.commit()?;
self.writer.commit().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
@ -128,7 +132,9 @@ impl StaticFileProviderRW {
let start = Instant::now();
// Commits offsets and new user_header to disk
self.writer.commit_without_sync_all()?;
self.writer
.commit_without_sync_all()
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
@ -249,11 +255,16 @@ impl StaticFileProviderRW {
self.writer = writer;
self.data_path = data_path;
NippyJar::<SegmentHeader>::load(&previous_snap)?.delete()?;
NippyJar::<SegmentHeader>::load(&previous_snap)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?
.delete()
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
} else {
// Update `SegmentHeader`
self.writer.user_header_mut().prune(len);
self.writer.prune_rows(len as usize)?;
self.writer
.prune_rows(len as usize)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
break
}
@ -263,7 +274,9 @@ impl StaticFileProviderRW {
self.writer.user_header_mut().prune(num_rows);
// Truncate data
self.writer.prune_rows(num_rows as usize)?;
self.writer
.prune_rows(num_rows as usize)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
num_rows = 0;
}
}
@ -285,7 +298,9 @@ impl StaticFileProviderRW {
self.buf.clear();
column.to_compact(&mut self.buf);
self.writer.append_column(Some(Ok(&self.buf)))?;
self.writer
.append_column(Some(Ok(&self.buf)))
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
Ok(())
}