feat: search for a snapshot that fulfills a queried BlockHash or TxHash (#5373)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
joshieDo
2023-11-15 16:53:28 +00:00
committed by GitHub
parent dc72cad838
commit a389a2b42d
11 changed files with 519 additions and 131 deletions

View File

@ -10,7 +10,8 @@ use reth_primitives::{
BlockHash, ChainSpec, Header, SnapshotSegment,
};
use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory,
providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError,
ProviderFactory, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{
@ -38,9 +39,15 @@ impl Command {
segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;
// Default name doesn't have any configuration
let tx_range = provider.transaction_range_by_block_range(range.clone())?;
reth_primitives::fs::rename(
SnapshotSegment::Headers.filename(&range),
SnapshotSegment::Headers.filename_with_configuration(filters, compression, &range),
SnapshotSegment::Headers.filename(&range, &tx_range),
SnapshotSegment::Headers.filename_with_configuration(
filters,
compression,
&range,
&tx_range,
),
)?;
Ok(())
@ -61,16 +68,24 @@ impl Command {
Filters::WithoutFilters
};
let range = self.block_range();
let block_range = self.block_range();
let mut row_indexes = range.clone().collect::<Vec<_>>();
let mut row_indexes = block_range.clone().collect::<Vec<_>>();
let mut rng = rand::thread_rng();
let path = SnapshotSegment::Headers
.filename_with_configuration(filters, compression, &range)
let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())
.provider()?
.transaction_range_by_block_range(block_range.clone())?;
let path: PathBuf = SnapshotSegment::Headers
.filename_with_configuration(filters, compression, &block_range, &tx_range)
.into();
let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Headers, self.from, Some(path))?;
let jar_provider = provider.get_segment_provider_from_block(
SnapshotSegment::Headers,
self.from,
Some(&path),
)?;
let mut cursor = jar_provider.cursor()?;
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {

View File

@ -27,21 +27,26 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let range = self.block_range();
let block_range = self.block_range();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};
let segment = segments::Receipts::new(compression, filters);
segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;
let segment: segments::Receipts = segments::Receipts::new(compression, filters);
segment.snapshot::<DB>(provider, PathBuf::default(), block_range.clone())?;
// Default name doesn't have any configuration
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
reth_primitives::fs::rename(
SnapshotSegment::Receipts.filename(&range),
SnapshotSegment::Receipts.filename_with_configuration(filters, compression, &range),
SnapshotSegment::Receipts.filename(&block_range, &tx_range),
SnapshotSegment::Receipts.filename_with_configuration(
filters,
compression,
&block_range,
&tx_range,
),
)?;
Ok(())
@ -62,7 +67,7 @@ impl Command {
Filters::WithoutFilters
};
let block_range = self.from..=(self.from + self.block_interval - 1);
let block_range = self.block_range();
let mut rng = rand::thread_rng();
@ -72,13 +77,16 @@ impl Command {
let mut row_indexes = tx_range.clone().collect::<Vec<_>>();
let path = SnapshotSegment::Receipts
.filename_with_configuration(filters, compression, &block_range)
let path: PathBuf = SnapshotSegment::Receipts
.filename_with_configuration(filters, compression, &block_range, &tx_range)
.into();
let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Receipts, self.from, Some(path))?;
let jar_provider = provider.get_segment_provider_from_block(
SnapshotSegment::Receipts,
self.from,
Some(&path),
)?;
let mut cursor = jar_provider.cursor()?;
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {

View File

@ -27,7 +27,7 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let range = self.block_range();
let block_range = self.block_range();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
@ -36,12 +36,18 @@ impl Command {
let segment = segments::Transactions::new(compression, filters);
segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;
segment.snapshot::<DB>(provider, PathBuf::default(), block_range.clone())?;
// Default name doesn't have any configuration
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
reth_primitives::fs::rename(
SnapshotSegment::Transactions.filename(&range),
SnapshotSegment::Transactions.filename_with_configuration(filters, compression, &range),
SnapshotSegment::Transactions.filename(&block_range, &tx_range),
SnapshotSegment::Transactions.filename_with_configuration(
filters,
compression,
&block_range,
&tx_range,
),
)?;
Ok(())
@ -62,7 +68,7 @@ impl Command {
Filters::WithoutFilters
};
let block_range = self.from..=(self.from + self.block_interval - 1);
let block_range = self.block_range();
let mut rng = rand::thread_rng();
@ -72,12 +78,15 @@ impl Command {
let mut row_indexes = tx_range.clone().collect::<Vec<_>>();
let path = SnapshotSegment::Transactions
.filename_with_configuration(filters, compression, &block_range)
let path: PathBuf = SnapshotSegment::Transactions
.filename_with_configuration(filters, compression, &block_range, &tx_range)
.into();
let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Transactions, self.from, Some(path))?;
let jar_provider = provider.get_segment_provider_from_block(
SnapshotSegment::Transactions,
self.from,
Some(&path),
)?;
let mut cursor = jar_provider.cursor()?;
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {

View File

@ -489,8 +489,8 @@ mod tests {
};
use reth_primitives::{
constants::eip4844::DATA_GAS_PER_BLOB, hex_literal::hex, proofs, Account, Address,
BlockBody, BlockHash, BlockHashOrNumber, Bytes, ChainSpecBuilder, ForkCondition, Header,
Signature, TransactionKind, TransactionSigned, Withdrawal, MAINNET, U256,
BlockBody, BlockHash, BlockHashOrNumber, Bytes, ChainSpecBuilder, Header, Signature,
TransactionKind, TransactionSigned, Withdrawal, MAINNET, U256,
};
use std::ops::RangeBounds;

View File

@ -1,6 +1,8 @@
use reth_primitives::{
Address, BlockHash, BlockHashOrNumber, BlockNumber, GotExpected, TxHashOrNumber, TxNumber, B256,
Address, BlockHash, BlockHashOrNumber, BlockNumber, GotExpected, SnapshotSegment,
TxHashOrNumber, TxNumber, B256,
};
use std::path::PathBuf;
use thiserror::Error;
/// Bundled errors variants thrown by various providers.
@ -94,6 +96,15 @@ pub enum ProviderError {
/// Provider does not support this particular request.
#[error("this provider does not support this request")]
UnsupportedProvider,
/// Snapshot file is not found at specified path.
#[error("not able to find {0} snapshot file at {1}")]
MissingSnapshotPath(SnapshotSegment, PathBuf),
/// Snapshot file is not found for requested block.
#[error("not able to find {0} snapshot file for block number {1}")]
MissingSnapshotBlock(SnapshotSegment, BlockNumber),
/// Snapshot file is not found for requested transaction.
#[error("not able to find {0} snapshot file for transaction id {1}")]
MissingSnapshotTx(SnapshotSegment, TxNumber),
}
/// A root mismatch error at a given block height.

View File

@ -4,11 +4,14 @@ mod compression;
mod filters;
mod segment;
use alloy_primitives::BlockNumber;
use alloy_primitives::{BlockNumber, TxNumber};
pub use compression::Compression;
pub use filters::{Filters, InclusionFilter, PerfectHashingFunction};
pub use segment::{SegmentConfig, SegmentHeader, SnapshotSegment};
use crate::fs::FsPathError;
use std::{ops::RangeInclusive, path::Path};
/// Default snapshot block count.
pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000;
@ -35,4 +38,30 @@ impl HighestSnapshots {
SnapshotSegment::Receipts => self.receipts,
}
}
/// Returns a mutable reference to a snapshot segment
pub fn as_mut(&mut self, segment: SnapshotSegment) -> &mut Option<BlockNumber> {
match segment {
SnapshotSegment::Headers => &mut self.headers,
SnapshotSegment::Transactions => &mut self.transactions,
SnapshotSegment::Receipts => &mut self.receipts,
}
}
}
/// Given the snapshot's location, it returns an iterator over the existing snapshots in the format
/// of a tuple composed by the segment, block range and transaction range.
pub fn iter_snapshots(
path: impl AsRef<Path>,
) -> Result<
impl Iterator<Item = (SnapshotSegment, RangeInclusive<BlockNumber>, RangeInclusive<TxNumber>)>,
FsPathError,
> {
let entries = crate::fs::read_dir(path.as_ref())?.filter_map(Result::ok);
Ok(entries.filter_map(|entry| {
if entry.metadata().map_or(false, |metadata| metadata.is_file()) {
return SnapshotSegment::parse_filename(&entry.file_name())
}
None
}))
}

View File

@ -2,8 +2,9 @@ use crate::{
snapshot::{Compression, Filters, InclusionFilter},
BlockNumber, TxNumber,
};
use derive_more::Display;
use serde::{Deserialize, Serialize};
use std::{ops::RangeInclusive, str::FromStr};
use std::{ffi::OsStr, ops::RangeInclusive, str::FromStr};
use strum::{AsRefStr, EnumString};
#[derive(
@ -19,6 +20,7 @@ use strum::{AsRefStr, EnumString};
Serialize,
EnumString,
AsRefStr,
Display,
)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
/// Segment of the data that can be snapshotted.
@ -53,10 +55,21 @@ impl SnapshotSegment {
}
/// Returns the default file name for the provided segment and range.
pub fn filename(&self, range: &RangeInclusive<BlockNumber>) -> String {
pub fn filename(
&self,
block_range: &RangeInclusive<BlockNumber>,
tx_range: &RangeInclusive<TxNumber>,
) -> String {
// ATTENTION: if changing the name format, be sure to reflect those changes in
// [`Self::parse_filename`].
format!("snapshot_{}_{}_{}", self.as_ref(), range.start(), range.end(),)
format!(
"snapshot_{}_{}_{}_{}_{}",
self.as_ref(),
block_range.start(),
block_range.end(),
tx_range.start(),
tx_range.end(),
)
}
/// Returns file name for the provided segment and range, alongisde filters, compression.
@ -64,9 +77,10 @@ impl SnapshotSegment {
&self,
filters: Filters,
compression: Compression,
range: &RangeInclusive<BlockNumber>,
block_range: &RangeInclusive<BlockNumber>,
tx_range: &RangeInclusive<TxNumber>,
) -> String {
let prefix = self.filename(range);
let prefix = self.filename(block_range, tx_range);
let filters_name = match filters {
Filters::WithFilters(inclusion_filter, phf) => {
@ -80,20 +94,41 @@ impl SnapshotSegment {
format!("{prefix}_{}_{}", filters_name, compression.as_ref())
}
/// Takes a filename and parses the [`SnapshotSegment`] and its inclusive range.
pub fn parse_filename(name: &str) -> Option<(Self, RangeInclusive<BlockNumber>)> {
let parts: Vec<&str> = name.split('_').collect();
if let (Ok(segment), true) = (Self::from_str(parts[1]), parts.len() >= 4) {
let start: u64 = parts[2].parse().unwrap_or(0);
let end: u64 = parts[3].parse().unwrap_or(0);
if start <= end || parts[0] != "snapshot" {
return None
/// Parses a filename into a `SnapshotSegment` and its corresponding block and transaction
/// ranges.
///
/// The filename is expected to follow the format:
/// "snapshot_{segment}_{block_start}_{block_end}_{tx_start}_{tx_end}". This function checks
/// for the correct prefix ("snapshot"), and then parses the segment and the inclusive
/// ranges for blocks and transactions. It ensures that the start of each range is less than the
/// end.
///
/// # Returns
/// - `Some((segment, block_range, tx_range))` if parsing is successful and all conditions are
/// met.
/// - `None` if any condition fails, such as an incorrect prefix, parsing error, or invalid
/// range.
///
/// # Note
/// This function is tightly coupled with the naming convention defined in [`Self::filename`].
/// Any changes in the filename format in `filename` should be reflected here.
pub fn parse_filename(
name: &OsStr,
) -> Option<(Self, RangeInclusive<BlockNumber>, RangeInclusive<TxNumber>)> {
let mut parts = name.to_str()?.split('_');
if parts.next() != Some("snapshot") {
return None;
}
return Some((segment, start..=end))
let segment = Self::from_str(parts.next()?).ok()?;
let (block_start, block_end) = (parts.next()?.parse().ok()?, parts.next()?.parse().ok()?);
let (tx_start, tx_end) = (parts.next()?.parse().ok()?, parts.next()?.parse().ok()?);
if block_start >= block_end || tx_start > tx_end {
return None;
}
None
Some((segment, block_start..=block_end, tx_start..=tx_end))
}
}
@ -145,3 +180,92 @@ pub struct SegmentConfig {
/// Compression used on the segment
pub compression: Compression,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_filename() {
let test_vectors = [
(SnapshotSegment::Headers, 2..=30, 0..=1, "snapshot_headers_2_30_0_1", None),
(
SnapshotSegment::Receipts,
30..=300,
110..=1000,
"snapshot_receipts_30_300_110_1000",
None,
),
(
SnapshotSegment::Transactions,
1_123_233..=11_223_233,
1_123_233..=2_123_233,
"snapshot_transactions_1123233_11223233_1123233_2123233",
None,
),
(
SnapshotSegment::Headers,
2..=30,
0..=1,
"snapshot_headers_2_30_0_1_cuckoo-fmph_lz4",
Some((
Compression::Lz4,
Filters::WithFilters(
InclusionFilter::Cuckoo,
crate::snapshot::PerfectHashingFunction::Fmph,
),
)),
),
(
SnapshotSegment::Headers,
2..=30,
0..=1,
"snapshot_headers_2_30_0_1_cuckoo-fmph_zstd",
Some((
Compression::Zstd,
Filters::WithFilters(
InclusionFilter::Cuckoo,
crate::snapshot::PerfectHashingFunction::Fmph,
),
)),
),
(
SnapshotSegment::Headers,
2..=30,
0..=1,
"snapshot_headers_2_30_0_1_cuckoo-fmph_zstd-dict",
Some((
Compression::ZstdWithDictionary,
Filters::WithFilters(
InclusionFilter::Cuckoo,
crate::snapshot::PerfectHashingFunction::Fmph,
),
)),
),
];
for (segment, block_range, tx_range, filename, configuration) in test_vectors {
if let Some((compression, filters)) = configuration {
assert_eq!(
segment.filename_with_configuration(
filters,
compression,
&block_range,
&tx_range
),
filename
);
} else {
assert_eq!(segment.filename(&block_range, &tx_range), filename);
}
assert_eq!(
SnapshotSegment::parse_filename(OsStr::new(filename)),
Some((segment, block_range, tx_range))
);
}
assert_eq!(SnapshotSegment::parse_filename(OsStr::new("snapshot_headers_2_30_3_2")), None);
assert_eq!(SnapshotSegment::parse_filename(OsStr::new("snapshot_headers_2_30_1")), None);
}
}

View File

@ -69,7 +69,7 @@ pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let mut nippy_jar = NippyJar::new(
COLUMNS,
&directory.as_ref().join(segment.filename(&block_range).as_str()),
&directory.as_ref().join(segment.filename(&block_range, &tx_range).as_str()),
SegmentHeader::new(block_range, tx_range, segment),
);

View File

@ -4,10 +4,16 @@ use crate::{segments, segments::Segment, SnapshotterError};
use reth_db::database::Database;
use reth_interfaces::{RethError, RethResult};
use reth_primitives::{
snapshot::HighestSnapshots, BlockNumber, ChainSpec, SnapshotSegment, TxNumber,
snapshot::{iter_snapshots, HighestSnapshots},
BlockNumber, ChainSpec, TxNumber,
};
use reth_provider::{BlockReader, DatabaseProviderRO, ProviderFactory, TransactionsProviderExt};
use std::{
collections::HashMap,
ops::RangeInclusive,
path::{Path, PathBuf},
sync::Arc,
};
use reth_provider::{BlockReader, DatabaseProviderRO, ProviderFactory};
use std::{collections::HashMap, ops::RangeInclusive, path::PathBuf, sync::Arc};
use tokio::sync::watch;
use tracing::warn;
@ -89,7 +95,7 @@ impl<DB: Database> Snapshotter<DB> {
/// Creates a new [Snapshotter].
pub fn new(
db: DB,
snapshots_path: PathBuf,
snapshots_path: impl AsRef<Path>,
chain_spec: Arc<ChainSpec>,
block_interval: u64,
) -> RethResult<Self> {
@ -97,8 +103,7 @@ impl<DB: Database> Snapshotter<DB> {
let mut snapshotter = Self {
provider_factory: ProviderFactory::new(db, chain_spec),
snapshots_path,
// TODO(alexey): fill from on-disk snapshot data
snapshots_path: snapshots_path.as_ref().into(),
highest_snapshots: HighestSnapshots::default(),
highest_snapshots_notifier,
highest_snapshots_tracker,
@ -152,23 +157,10 @@ impl<DB: Database> Snapshotter<DB> {
// It walks over the directory and parses the snapshot filenames extracting
// `SnapshotSegment` and their inclusive range. It then takes the maximum block
// number for each specific segment.
for (segment, range) in reth_primitives::fs::read_dir(&self.snapshots_path)?
.filter_map(Result::ok)
.filter_map(|entry| {
if let Ok(true) = entry.metadata().map(|metadata| metadata.is_file()) {
return SnapshotSegment::parse_filename(&entry.file_name().to_string_lossy())
}
None
})
{
let max_segment_block = match segment {
SnapshotSegment::Headers => &mut self.highest_snapshots.headers,
SnapshotSegment::Transactions => &mut self.highest_snapshots.transactions,
SnapshotSegment::Receipts => &mut self.highest_snapshots.receipts,
};
if max_segment_block.map_or(true, |block| block < *range.end()) {
*max_segment_block = Some(*range.end());
for (segment, block_range, _) in iter_snapshots(&self.snapshots_path)? {
let max_segment_block = self.highest_snapshots.as_mut(segment);
if max_segment_block.map_or(true, |block| block < *block_range.end()) {
*max_segment_block = Some(*block_range.end());
}
}
@ -218,13 +210,11 @@ impl<DB: Database> Snapshotter<DB> {
) -> RethResult<()> {
if let Some(block_range) = block_range {
let temp = self.snapshots_path.join(TEMPORARY_SUBDIRECTORY);
let filename = S::segment().filename(&block_range);
let provider = self.provider_factory.provider()?;
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let filename = S::segment().filename(&block_range, &tx_range);
S::default().snapshot::<DB>(
&self.provider_factory.provider()?,
temp.clone(),
block_range.clone(),
)?;
S::default().snapshot::<DB>(&provider, temp.clone(), block_range)?;
reth_primitives::fs::rename(temp.join(&filename), self.snapshots_path.join(filename))?;
}

View File

@ -1,23 +1,45 @@
use super::{LoadedJar, SnapshotJarProvider};
use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider};
use dashmap::DashMap;
use reth_interfaces::RethResult;
use parking_lot::RwLock;
use reth_db::{
codecs::CompactU256,
snapshot::{HeaderMask, TransactionMask},
};
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{HighestSnapshots, BLOCKS_PER_SNAPSHOT},
Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader,
SnapshotSegment, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber,
B256, U256,
snapshot::HighestSnapshots, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo,
Header, SealedHeader, SnapshotSegment, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, TxHash, TxNumber, B256, U256,
};
use revm::primitives::HashMap;
use std::{
collections::BTreeMap,
ops::{RangeBounds, RangeInclusive},
path::{Path, PathBuf},
};
use std::{ops::RangeBounds, path::PathBuf};
use tokio::sync::watch;
/// SnapshotProvider
/// Alias type for a map that can be queried for transaction/block ranges from a block/transaction
/// segment respectively. It uses `BlockNumber` to represent the block end of a snapshot range or
/// `TxNumber` to represent the transaction end of a snapshot range.
///
/// Can be in one of the two formats:
/// - `HashMap<SnapshotSegment, BTreeMap<BlockNumber, RangeInclusive<TxNumber>>>`
/// - `HashMap<SnapshotSegment, BTreeMap<TxNumber, RangeInclusive<BlockNumber>>>`
type SegmentRanges = HashMap<SnapshotSegment, BTreeMap<u64, RangeInclusive<u64>>>;
/// [`SnapshotProvider`] manages all existing [`SnapshotJarProvider`].
#[derive(Debug, Default)]
pub struct SnapshotProvider {
/// Maintains a map which allows for concurrent access to different `NippyJars`, over different
/// segments and ranges.
map: DashMap<(BlockNumber, SnapshotSegment), LoadedJar>,
/// Available snapshot ranges on disk indexed by max blocks.
snapshots_block_index: RwLock<SegmentRanges>,
/// Available snapshot ranges on disk indexed by max transactions.
snapshots_tx_index: RwLock<SegmentRanges>,
/// Tracks the highest snapshot of every segment.
highest_tracker: Option<watch::Receiver<Option<HighestSnapshots>>>,
/// Directory where snapshots are located
@ -26,8 +48,14 @@ pub struct SnapshotProvider {
impl SnapshotProvider {
/// Creates a new [`SnapshotProvider`].
pub fn new(path: PathBuf) -> Self {
Self { map: Default::default(), highest_tracker: None, path }
pub fn new(path: impl AsRef<Path>) -> Self {
Self {
map: Default::default(),
snapshots_block_index: Default::default(),
snapshots_tx_index: Default::default(),
highest_tracker: None,
path: path.as_ref().to_path_buf(),
}
}
/// Adds a highest snapshot tracker to the provider
@ -39,30 +67,137 @@ impl SnapshotProvider {
self
}
/// Gets the provider of the requested segment and range.
pub fn get_segment_provider(
/// Gets the [`SnapshotJarProvider`] of the requested segment and block.
pub fn get_segment_provider_from_block(
&self,
segment: SnapshotSegment,
block: BlockNumber,
mut path: Option<PathBuf>,
path: Option<&Path>,
) -> RethResult<SnapshotJarProvider<'_>> {
// TODO this invalidates custom length snapshots.
let snapshot = block / BLOCKS_PER_SNAPSHOT;
let key = (snapshot, segment);
self.get_segment_provider(
segment,
|| self.get_segment_ranges_from_block(segment, block),
path,
)?
.ok_or_else(|| ProviderError::MissingSnapshotBlock(segment, block).into())
}
/// Gets the [`SnapshotJarProvider`] of the requested segment and transaction.
pub fn get_segment_provider_from_transaction(
&self,
segment: SnapshotSegment,
tx: TxNumber,
path: Option<&Path>,
) -> RethResult<SnapshotJarProvider<'_>> {
self.get_segment_provider(
segment,
|| self.get_segment_ranges_from_transaction(segment, tx),
path,
)?
.ok_or_else(|| ProviderError::MissingSnapshotTx(segment, tx).into())
}
/// Gets the [`SnapshotJarProvider`] of the requested segment and block or transaction.
pub fn get_segment_provider(
&self,
segment: SnapshotSegment,
fn_ranges: impl Fn() -> Option<(RangeInclusive<BlockNumber>, RangeInclusive<TxNumber>)>,
path: Option<&Path>,
) -> RethResult<Option<SnapshotJarProvider<'_>>> {
// If we have a path, then get the block range and transaction range from its name.
// Otherwise, check `self.available_snapshots`
let snapshot_ranges = match path {
Some(path) => {
SnapshotSegment::parse_filename(path.file_name().ok_or_else(|| {
ProviderError::MissingSnapshotPath(segment, path.to_path_buf())
})?)
.and_then(|(parsed_segment, block_range, tx_range)| {
if parsed_segment == segment {
return Some((block_range, tx_range));
}
None
})
}
None => fn_ranges(),
};
// Return cached `LoadedJar` or insert it for the first time, and then, return it.
if let Some((block_range, tx_range)) = snapshot_ranges {
return Ok(Some(self.get_or_create_jar_provider(segment, &block_range, &tx_range)?));
}
Ok(None)
}
/// Given a segment, block range and transaction range it returns a cached
/// [`SnapshotJarProvider`]. TODO: we should check the size and pop N if there's too many.
fn get_or_create_jar_provider(
&self,
segment: SnapshotSegment,
block_range: &RangeInclusive<u64>,
tx_range: &RangeInclusive<u64>,
) -> Result<SnapshotJarProvider<'_>, reth_interfaces::RethError> {
let key = (*block_range.end(), segment);
if let Some(jar) = self.map.get(&key) {
return Ok(jar.into())
}
if let Some(path) = &path {
self.map.insert(key, LoadedJar::new(NippyJar::load(path)?)?);
Ok(jar.into())
} else {
path = Some(self.path.join(segment.filename(
&((snapshot * BLOCKS_PER_SNAPSHOT)..=((snapshot + 1) * BLOCKS_PER_SNAPSHOT - 1)),
)));
self.map.insert(
key,
LoadedJar::new(NippyJar::load(
&self.path.join(segment.filename(block_range, tx_range)),
)?)?,
);
Ok(self.map.get(&key).expect("qed").into())
}
}
self.get_segment_provider(segment, block, path)
/// Gets a snapshot segment's block range and transaction range from the provider inner block
/// index.
fn get_segment_ranges_from_block(
&self,
segment: SnapshotSegment,
block: u64,
) -> Option<(RangeInclusive<BlockNumber>, RangeInclusive<TxNumber>)> {
let snapshots = self.snapshots_block_index.read();
let segment_snapshots = snapshots.get(&segment)?;
// It's more probable that the request comes from a newer block height, so we iterate
// the snapshots in reverse.
let mut snapshots_rev_iter = segment_snapshots.iter().rev().peekable();
while let Some((block_end, tx_range)) = snapshots_rev_iter.next() {
// `unwrap_or(0) is safe here as it sets block_start to 0 if the iterator is empty,
// indicating the lowest height snapshot has been reached.
let block_start =
snapshots_rev_iter.peek().map(|(block_end, _)| *block_end + 1).unwrap_or(0);
if block_start <= block {
return Some((block_start..=*block_end, tx_range.clone()));
}
}
None
}
/// Gets a snapshot segment's block range and transaction range from the provider inner
/// transaction index.
fn get_segment_ranges_from_transaction(
&self,
segment: SnapshotSegment,
tx: u64,
) -> Option<(RangeInclusive<BlockNumber>, RangeInclusive<TxNumber>)> {
let snapshots = self.snapshots_tx_index.read();
let segment_snapshots = snapshots.get(&segment)?;
// It's more probable that the request comes from a newer tx height, so we iterate
// the snapshots in reverse.
let mut snapshots_rev_iter = segment_snapshots.iter().rev().peekable();
while let Some((tx_end, block_range)) = snapshots_rev_iter.next() {
let tx_start = snapshots_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
if tx_start <= tx {
return Some((block_range.clone(), tx_start..=*tx_end));
}
}
None
}
/// Gets the highest snapshot if it exists for a snapshot segment.
@ -71,23 +206,72 @@ impl SnapshotProvider {
.as_ref()
.and_then(|tracker| tracker.borrow().and_then(|highest| highest.highest(segment)))
}
/// Iterates through segment snapshots in reverse order, executing a function until it returns
/// some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
pub fn find_snapshot<T>(
&self,
segment: SnapshotSegment,
func: impl Fn(SnapshotJarProvider<'_>) -> RethResult<Option<T>>,
) -> RethResult<Option<T>> {
let snapshots = self.snapshots_block_index.read();
if let Some(segment_snapshots) = snapshots.get(&segment) {
// It's more probable that the request comes from a newer block height, so we iterate
// the snapshots in reverse.
let mut snapshots_rev_iter = segment_snapshots.iter().rev().peekable();
while let Some((block_end, tx_range)) = snapshots_rev_iter.next() {
// `unwrap_or(0) is safe here as it sets block_start to 0 if the iterator
// is empty, indicating the lowest height snapshot has been reached.
let block_start =
snapshots_rev_iter.peek().map(|(block_end, _)| *block_end + 1).unwrap_or(0);
if let Some(res) = func(self.get_or_create_jar_provider(
segment,
&(block_start..=*block_end),
tx_range,
)?)? {
return Ok(Some(res))
}
}
}
Ok(None)
}
}
impl HeaderProvider for SnapshotProvider {
fn header(&self, _block_hash: &BlockHash) -> RethResult<Option<Header>> {
todo!()
fn header(&self, block_hash: &BlockHash) -> RethResult<Option<Header>> {
self.find_snapshot(SnapshotSegment::Headers, |jar_provider| {
Ok(jar_provider
.cursor()?
.get_two::<HeaderMask<Header, BlockHash>>(block_hash.into())?
.and_then(|(header, hash)| {
if &hash == block_hash {
return Some(header)
}
None
}))
})
}
fn header_by_number(&self, num: BlockNumber) -> RethResult<Option<Header>> {
self.get_segment_provider(SnapshotSegment::Headers, num, None)?.header_by_number(num)
self.get_segment_provider_from_block(SnapshotSegment::Headers, num, None)?
.header_by_number(num)
}
fn header_td(&self, _block_hash: &BlockHash) -> RethResult<Option<U256>> {
todo!()
fn header_td(&self, block_hash: &BlockHash) -> RethResult<Option<U256>> {
self.find_snapshot(SnapshotSegment::Headers, |jar_provider| {
Ok(jar_provider
.cursor()?
.get_two::<HeaderMask<CompactU256, BlockHash>>(block_hash.into())?
.and_then(|(td, hash)| (&hash == block_hash).then_some(td.0)))
})
}
fn header_td_by_number(&self, _number: BlockNumber) -> RethResult<Option<U256>> {
todo!();
fn header_td_by_number(&self, num: BlockNumber) -> RethResult<Option<U256>> {
self.get_segment_provider_from_block(SnapshotSegment::Headers, num, None)?
.header_td_by_number(num)
}
fn headers_range(&self, _range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
@ -101,14 +285,15 @@ impl HeaderProvider for SnapshotProvider {
todo!();
}
fn sealed_header(&self, _number: BlockNumber) -> RethResult<Option<SealedHeader>> {
todo!();
fn sealed_header(&self, num: BlockNumber) -> RethResult<Option<SealedHeader>> {
self.get_segment_provider_from_block(SnapshotSegment::Headers, num, None)?
.sealed_header(num)
}
}
impl BlockHashReader for SnapshotProvider {
fn block_hash(&self, _number: u64) -> RethResult<Option<B256>> {
todo!()
fn block_hash(&self, num: u64) -> RethResult<Option<B256>> {
self.get_segment_provider_from_block(SnapshotSegment::Headers, num, None)?.block_hash(num)
}
fn canonical_hashes_range(
@ -139,26 +324,42 @@ impl BlockNumReader for SnapshotProvider {
}
impl TransactionsProvider for SnapshotProvider {
fn transaction_id(&self, _tx_hash: TxHash) -> RethResult<Option<TxNumber>> {
todo!()
fn transaction_id(&self, tx_hash: TxHash) -> RethResult<Option<TxNumber>> {
self.find_snapshot(SnapshotSegment::Transactions, |jar_provider| {
let mut cursor = jar_provider.cursor()?;
if cursor
.get_one::<TransactionMask<TransactionSignedNoHash>>((&tx_hash).into())?
.and_then(|tx| (tx.hash() == tx_hash).then_some(tx))
.is_some()
{
Ok(Some(cursor.number()))
} else {
Ok(None)
}
})
}
fn transaction_by_id(&self, num: TxNumber) -> RethResult<Option<TransactionSigned>> {
// TODO `num` is provided after checking the index
let block_num = num;
self.get_segment_provider(SnapshotSegment::Transactions, block_num, None)?
self.get_segment_provider_from_transaction(SnapshotSegment::Transactions, num, None)?
.transaction_by_id(num)
}
fn transaction_by_id_no_hash(
&self,
_id: TxNumber,
num: TxNumber,
) -> RethResult<Option<TransactionSignedNoHash>> {
todo!()
self.get_segment_provider_from_transaction(SnapshotSegment::Transactions, num, None)?
.transaction_by_id_no_hash(num)
}
fn transaction_by_hash(&self, _hash: TxHash) -> RethResult<Option<TransactionSigned>> {
todo!()
fn transaction_by_hash(&self, hash: TxHash) -> RethResult<Option<TransactionSigned>> {
self.find_snapshot(SnapshotSegment::Transactions, |jar_provider| {
Ok(jar_provider
.cursor()?
.get_one::<TransactionMask<TransactionSignedNoHash>>((&hash).into())?
.map(|tx| tx.with_hash())
.and_then(|tx| (tx.hash_ref() == &hash).then_some(tx)))
})
}
fn transaction_by_hash_with_meta(
@ -197,7 +398,7 @@ impl TransactionsProvider for SnapshotProvider {
todo!()
}
fn transaction_sender(&self, _id: TxNumber) -> RethResult<Option<Address>> {
todo!()
fn transaction_sender(&self, id: TxNumber) -> RethResult<Option<Address>> {
Ok(self.transaction_by_id_no_hash(id)?.and_then(|tx| tx.recover_signer()))
}
}

View File

@ -66,7 +66,8 @@ mod test {
// Data sources
let db = create_test_rw_db();
let factory = ProviderFactory::new(&db, MAINNET.clone());
let snap_file = tempfile::NamedTempFile::new().unwrap();
let snap_path = tempfile::tempdir().unwrap();
let snap_file = snap_path.path().join(SnapshotSegment::Headers.filename(&range, &range));
// Setup data
let mut headers = random_header_range(
@ -96,7 +97,7 @@ mod test {
let with_compression = true;
let with_filter = true;
let mut nippy_jar = NippyJar::new(3, snap_file.path(), segment_header);
let mut nippy_jar = NippyJar::new(3, snap_file.as_path(), segment_header);
if with_compression {
nippy_jar = nippy_jar.with_zstd(false, 0);
@ -134,9 +135,9 @@ mod test {
// Use providers to query Header data and compare if it matches
{
let db_provider = factory.provider().unwrap();
let manager = SnapshotProvider::default();
let manager = SnapshotProvider::new(snap_path.path());
let jar_provider = manager
.get_segment_provider(SnapshotSegment::Headers, 0, Some(snap_file.path().into()))
.get_segment_provider_from_block(SnapshotSegment::Headers, 0, Some(&snap_file))
.unwrap();
assert!(!headers.is_empty());