feat: share mmap handle inbetween snapshot providers & cursors (#5162)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
joshieDo
2023-10-27 11:27:36 +01:00
committed by GitHub
parent 006259b4f6
commit a2323c90c2
6 changed files with 614 additions and 522 deletions

View File

@ -1,10 +1,9 @@
use crate::{
compression::{Compression, Compressors, Zstd},
InclusionFilter, NippyJar, NippyJarError, PerfectHashingFunction, RefRow,
InclusionFilter, MmapHandle, NippyJar, NippyJarError, PerfectHashingFunction, RefRow,
};
use memmap2::Mmap;
use serde::{de::Deserialize, ser::Serialize};
use std::{fs::File, ops::Range, sync::Arc};
use std::ops::Range;
use sucds::int_vectors::Access;
use zstd::bulk::Decompressor;
@ -14,10 +13,7 @@ pub struct NippyJarCursor<'a, H = ()> {
/// [`NippyJar`] which holds most of the required configuration to read from the file.
jar: &'a NippyJar<H>,
/// Data file.
#[allow(unused)]
file_handle: Arc<File>,
/// Data file.
mmap_handle: Arc<Mmap>,
mmap_handle: MmapHandle,
/// Internal buffer to unload data to without reallocating memory on each retrieval.
internal_buffer: Vec<u8>,
/// Cursor row position.
@ -38,16 +34,24 @@ where
H: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static,
{
pub fn new(jar: &'a NippyJar<H>) -> Result<Self, NippyJarError> {
let file = File::open(jar.data_path())?;
// SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
let mmap = unsafe { Mmap::map(&file)? };
let max_row_size = jar.max_row_size;
Ok(NippyJarCursor {
jar,
file_handle: Arc::new(file),
mmap_handle: Arc::new(mmap),
mmap_handle: jar.open_data()?,
// Makes sure that we have enough buffer capacity to decompress any row of data.
internal_buffer: Vec::with_capacity(max_row_size),
row: 0,
})
}
pub fn with_handle(
jar: &'a NippyJar<H>,
mmap_handle: MmapHandle,
) -> Result<Self, NippyJarError> {
let max_row_size = jar.max_row_size;
Ok(NippyJarCursor {
jar,
mmap_handle,
// Makes sure that we have enough buffer capacity to decompress any row of data.
internal_buffer: Vec::with_capacity(max_row_size),
row: 0,

View File

@ -10,6 +10,7 @@
#![deny(unused_must_use, rust_2018_idioms)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use memmap2::Mmap;
use serde::{Deserialize, Serialize};
use std::{
clone::Clone,
@ -17,7 +18,9 @@ use std::{
fs::File,
io::{Seek, Write},
marker::Sync,
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
};
use sucds::{
int_vectors::PrefixSummedEliasFano,
@ -247,6 +250,11 @@ where
.join(format!("{}.idx", data_path.file_name().expect("exists").to_string_lossy()))
}
/// Returns a [`MmapHandle`] of the data file
pub fn open_data(&self) -> Result<MmapHandle, NippyJarError> {
MmapHandle::new(self.data_path())
}
/// If required, prepares any compression algorithm to an early pass of the data.
pub fn prepare_compression(
&mut self,
@ -487,6 +495,34 @@ where
}
}
/// Holds an `Arc` over a file and its associated mmap handle.
#[derive(Debug, Clone)]
pub struct MmapHandle {
/// File descriptor. Needs to be kept alive as long as the mmap handle.
#[allow(unused)]
file: Arc<File>,
/// Mmap handle.
mmap: Arc<Mmap>,
}
impl MmapHandle {
pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
let file = File::open(path)?;
// SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
let mmap = unsafe { Mmap::map(&file)? };
Ok(Self { file: Arc::new(file), mmap: Arc::new(mmap) })
}
}
impl Deref for MmapHandle {
type Target = Mmap;
fn deref(&self) -> &Self::Target {
&self.mmap
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,508 +0,0 @@
use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider};
use dashmap::DashMap;
use reth_db::{
table::{Decompress, Table},
HeaderTD,
};
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_nippy_jar::{NippyJar, NippyJarCursor};
use reth_primitives::{
snapshot::{SegmentHeader, BLOCKS_PER_SNAPSHOT},
Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader,
SnapshotSegment, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber,
B256, U256,
};
use std::{ops::RangeBounds, path::PathBuf};
/// Alias type for each specific `NippyJar`.
type NippyJarRef<'a> =
dashmap::mapref::one::Ref<'a, (u64, SnapshotSegment), NippyJar<SegmentHeader>>;
/// SnapshotProvider
#[derive(Debug, Default)]
pub struct SnapshotProvider {
/// Maintains a map which allows for concurrent access to different `NippyJars`, over different
/// segments and ranges.
map: DashMap<(BlockNumber, SnapshotSegment), NippyJar<SegmentHeader>>,
}
impl SnapshotProvider {
/// Gets the provider of the requested segment and range.
pub fn get_segment_provider(
&self,
segment: SnapshotSegment,
block: BlockNumber,
mut path: Option<PathBuf>,
) -> RethResult<SnapshotJarProvider<'_>> {
// TODO this invalidates custom length snapshots.
let snapshot = block / BLOCKS_PER_SNAPSHOT;
let key = (snapshot, segment);
if let Some(jar) = self.map.get(&key) {
return Ok(SnapshotJarProvider { jar })
}
if let Some(path) = &path {
let jar = NippyJar::load(path)?;
self.map.insert(key, jar);
} else {
path = Some(segment.filename(
&((snapshot * BLOCKS_PER_SNAPSHOT)..=((snapshot + 1) * BLOCKS_PER_SNAPSHOT - 1)),
));
}
self.get_segment_provider(segment, block, path)
}
}
impl HeaderProvider for SnapshotProvider {
fn header(&self, _block_hash: &BlockHash) -> RethResult<Option<Header>> {
todo!()
}
fn header_by_number(&self, num: BlockNumber) -> RethResult<Option<Header>> {
self.get_segment_provider(SnapshotSegment::Headers, num, None)?.header_by_number(num)
}
fn header_td(&self, _block_hash: &BlockHash) -> RethResult<Option<U256>> {
todo!()
}
fn header_td_by_number(&self, _number: BlockNumber) -> RethResult<Option<U256>> {
todo!();
}
fn headers_range(&self, _range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
todo!();
}
fn sealed_headers_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
todo!();
}
fn sealed_header(&self, _number: BlockNumber) -> RethResult<Option<SealedHeader>> {
todo!();
}
}
impl BlockHashReader for SnapshotProvider {
fn block_hash(&self, _number: u64) -> RethResult<Option<B256>> {
todo!()
}
fn canonical_hashes_range(
&self,
_start: BlockNumber,
_end: BlockNumber,
) -> RethResult<Vec<B256>> {
todo!()
}
}
impl BlockNumReader for SnapshotProvider {
fn chain_info(&self) -> RethResult<ChainInfo> {
todo!()
}
fn best_block_number(&self) -> RethResult<BlockNumber> {
todo!()
}
fn last_block_number(&self) -> RethResult<BlockNumber> {
todo!()
}
fn block_number(&self, _hash: B256) -> RethResult<Option<BlockNumber>> {
todo!()
}
}
impl TransactionsProvider for SnapshotProvider {
fn transaction_id(&self, _tx_hash: TxHash) -> RethResult<Option<TxNumber>> {
todo!()
}
fn transaction_by_id(&self, num: TxNumber) -> RethResult<Option<TransactionSigned>> {
// TODO `num` is provided after checking the index
let block_num = num;
self.get_segment_provider(SnapshotSegment::Transactions, block_num, None)?
.transaction_by_id(num)
}
fn transaction_by_id_no_hash(
&self,
_id: TxNumber,
) -> RethResult<Option<TransactionSignedNoHash>> {
todo!()
}
fn transaction_by_hash(&self, _hash: TxHash) -> RethResult<Option<TransactionSigned>> {
todo!()
}
fn transaction_by_hash_with_meta(
&self,
_hash: TxHash,
) -> RethResult<Option<(TransactionSigned, TransactionMeta)>> {
todo!()
}
fn transaction_block(&self, _id: TxNumber) -> RethResult<Option<BlockNumber>> {
todo!()
}
fn transactions_by_block(
&self,
_block_id: BlockHashOrNumber,
) -> RethResult<Option<Vec<TransactionSigned>>> {
todo!()
}
fn transactions_by_block_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>> {
todo!()
}
fn senders_by_tx_range(&self, _range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
todo!()
}
fn transactions_by_tx_range(
&self,
_range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<reth_primitives::TransactionSignedNoHash>> {
todo!()
}
fn transaction_sender(&self, _id: TxNumber) -> RethResult<Option<Address>> {
todo!()
}
}
/// Provider over a specific `NippyJar` and range.
#[derive(Debug)]
pub struct SnapshotJarProvider<'a> {
/// Reference to a value on [`SnapshotProvider`]
pub jar: NippyJarRef<'a>,
}
impl<'a> SnapshotJarProvider<'a> {
/// Provides a cursor for more granular data access.
pub fn cursor<'b>(&'b self) -> RethResult<NippyJarCursor<'a, SegmentHeader>>
where
'b: 'a,
{
Ok(NippyJarCursor::new(self.jar.value())?)
}
}
impl<'a> HeaderProvider for SnapshotJarProvider<'a> {
fn header(&self, block_hash: &BlockHash) -> RethResult<Option<Header>> {
// WIP
let mut cursor = NippyJarCursor::new(self.jar.value())?;
let header = Header::decompress(
cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0],
)
.unwrap();
if &header.hash_slow() == block_hash {
return Ok(Some(header))
} else {
// check next snapshot
}
Ok(None)
}
fn header_by_number(&self, num: BlockNumber) -> RethResult<Option<Header>> {
Header::decompress(
NippyJarCursor::new(self.jar.value())?
.row_by_number_with_cols::<0b01, 2>(
(num - self.jar.user_header().block_start()) as usize,
)?
.ok_or(ProviderError::HeaderNotFound(num.into()))?[0],
)
.map(Some)
.map_err(Into::into)
}
fn header_td(&self, block_hash: &BlockHash) -> RethResult<Option<U256>> {
// WIP
let mut cursor = NippyJarCursor::new(self.jar.value())?;
let row = cursor.row_by_key_with_cols::<0b11, 2>(&block_hash.0).unwrap().unwrap();
let header = Header::decompress(row[0]).unwrap();
let td = <HeaderTD as Table>::Value::decompress(row[1]).unwrap();
if &header.hash_slow() == block_hash {
return Ok(Some(td.0))
} else {
// check next snapshot
}
Ok(None)
}
fn header_td_by_number(&self, _number: BlockNumber) -> RethResult<Option<U256>> {
unimplemented!();
}
fn headers_range(&self, _range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
unimplemented!();
}
fn sealed_headers_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
unimplemented!();
}
fn sealed_header(&self, _number: BlockNumber) -> RethResult<Option<SealedHeader>> {
unimplemented!();
}
}
impl<'a> BlockHashReader for SnapshotJarProvider<'a> {
fn block_hash(&self, _number: u64) -> RethResult<Option<B256>> {
todo!()
}
fn canonical_hashes_range(
&self,
_start: BlockNumber,
_end: BlockNumber,
) -> RethResult<Vec<B256>> {
todo!()
}
}
impl<'a> BlockNumReader for SnapshotJarProvider<'a> {
fn chain_info(&self) -> RethResult<ChainInfo> {
todo!()
}
fn best_block_number(&self) -> RethResult<BlockNumber> {
todo!()
}
fn last_block_number(&self) -> RethResult<BlockNumber> {
todo!()
}
fn block_number(&self, _hash: B256) -> RethResult<Option<BlockNumber>> {
todo!()
}
}
impl<'a> TransactionsProvider for SnapshotJarProvider<'a> {
fn transaction_id(&self, _tx_hash: TxHash) -> RethResult<Option<TxNumber>> {
todo!()
}
fn transaction_by_id(&self, num: TxNumber) -> RethResult<Option<TransactionSigned>> {
TransactionSignedNoHash::decompress(
NippyJarCursor::new(self.jar.value())?
.row_by_number_with_cols::<0b1, 1>(
(num - self.jar.user_header().tx_start()) as usize,
)?
.ok_or(ProviderError::TransactionNotFound(num.into()))?[0],
)
.map(Into::into)
.map(Some)
.map_err(Into::into)
}
fn transaction_by_id_no_hash(
&self,
_id: TxNumber,
) -> RethResult<Option<TransactionSignedNoHash>> {
todo!()
}
fn transaction_by_hash(&self, hash: TxHash) -> RethResult<Option<TransactionSigned>> {
// WIP
let mut cursor = NippyJarCursor::new(self.jar.value())?;
let tx = TransactionSignedNoHash::decompress(
cursor.row_by_key_with_cols::<0b1, 1>(&hash.0).unwrap().unwrap()[0],
)
.unwrap()
.with_hash();
if tx.hash() == hash {
return Ok(Some(tx))
} else {
// check next snapshot
}
Ok(None)
}
fn transaction_by_hash_with_meta(
&self,
_hash: TxHash,
) -> RethResult<Option<(TransactionSigned, TransactionMeta)>> {
todo!()
}
fn transaction_block(&self, _id: TxNumber) -> RethResult<Option<BlockNumber>> {
todo!()
}
fn transactions_by_block(
&self,
_block_id: BlockHashOrNumber,
) -> RethResult<Option<Vec<TransactionSigned>>> {
todo!()
}
fn transactions_by_block_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>> {
todo!()
}
fn senders_by_tx_range(&self, _range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
todo!()
}
fn transactions_by_tx_range(
&self,
_range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<reth_primitives::TransactionSignedNoHash>> {
todo!()
}
fn transaction_sender(&self, _id: TxNumber) -> RethResult<Option<Address>> {
todo!()
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::ProviderFactory;
use rand::{self, seq::SliceRandom};
use reth_db::{
cursor::DbCursorRO,
database::Database,
snapshot::create_snapshot_T1_T2,
test_utils::create_test_rw_db,
transaction::{DbTx, DbTxMut},
CanonicalHeaders, DatabaseError, HeaderNumbers, HeaderTD, Headers, RawTable,
};
use reth_interfaces::test_utils::generators::{self, random_header_range};
use reth_nippy_jar::NippyJar;
use reth_primitives::{B256, MAINNET};
#[test]
fn test_snap() {
// Ranges
let row_count = 100u64;
let range = 0..=(row_count - 1);
let segment_header = SegmentHeader::new(range.clone(), range.clone());
// Data sources
let db = create_test_rw_db();
let factory = ProviderFactory::new(&db, MAINNET.clone());
let snap_file = tempfile::NamedTempFile::new().unwrap();
// Setup data
let mut headers = random_header_range(
&mut generators::rng(),
*range.start()..(*range.end() + 1),
B256::random(),
);
db.update(|tx| -> Result<(), DatabaseError> {
let mut td = U256::ZERO;
for header in headers.clone() {
td += header.header.difficulty;
let hash = header.hash();
tx.put::<CanonicalHeaders>(header.number, hash)?;
tx.put::<Headers>(header.number, header.clone().unseal())?;
tx.put::<HeaderTD>(header.number, td.into())?;
tx.put::<HeaderNumbers>(hash, header.number)?;
}
Ok(())
})
.unwrap()
.unwrap();
// Create Snapshot
{
let with_compression = true;
let with_filter = true;
let mut nippy_jar = NippyJar::new(2, snap_file.path(), segment_header);
if with_compression {
nippy_jar = nippy_jar.with_zstd(false, 0);
}
if with_filter {
nippy_jar = nippy_jar.with_cuckoo_filter(row_count as usize + 10).with_fmph();
}
let tx = db.tx().unwrap();
// Hacky type inference. TODO fix
let mut none_vec = Some(vec![vec![vec![0u8]].into_iter()]);
let _ = none_vec.take();
// Generate list of hashes for filters & PHF
let mut cursor = tx.cursor_read::<RawTable<CanonicalHeaders>>().unwrap();
let hashes = cursor
.walk(None)
.unwrap()
.map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into()));
create_snapshot_T1_T2::<Headers, HeaderTD, BlockNumber, SegmentHeader>(
&tx,
range,
None,
none_vec,
Some(hashes),
row_count as usize,
&mut nippy_jar,
)
.unwrap();
}
// Use providers to query Header data and compare if it matches
{
let db_provider = factory.provider().unwrap();
let manager = SnapshotProvider::default();
let jar_provider = manager
.get_segment_provider(SnapshotSegment::Headers, 0, Some(snap_file.path().into()))
.unwrap();
assert!(!headers.is_empty());
// Shuffled for chaos.
headers.shuffle(&mut generators::rng());
for header in headers {
let header_hash = header.hash();
let header = header.unseal();
// Compare Header
assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap());
assert_eq!(header, jar_provider.header(&header_hash).unwrap().unwrap());
// Compare HeaderTD
assert_eq!(
db_provider.header_td(&header_hash).unwrap().unwrap(),
jar_provider.header_td(&header_hash).unwrap().unwrap()
);
}
}
}
}

View File

@ -0,0 +1,222 @@
use super::LoadedJarRef;
use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider};
use reth_db::{
table::{Decompress, Table},
HeaderTD,
};
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_nippy_jar::NippyJarCursor;
use reth_primitives::{
snapshot::SegmentHeader, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header,
SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber,
B256, U256,
};
use std::ops::{Deref, RangeBounds};
/// Provider over a specific `NippyJar` and range.
#[derive(Debug)]
pub struct SnapshotJarProvider<'a>(LoadedJarRef<'a>);
impl<'a> Deref for SnapshotJarProvider<'a> {
type Target = LoadedJarRef<'a>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a> From<LoadedJarRef<'a>> for SnapshotJarProvider<'a> {
fn from(value: LoadedJarRef<'a>) -> Self {
SnapshotJarProvider(value)
}
}
impl<'a> SnapshotJarProvider<'a> {
/// Provides a cursor for more granular data access.
pub fn cursor<'b>(&'b self) -> RethResult<NippyJarCursor<'a, SegmentHeader>>
where
'b: 'a,
{
Ok(NippyJarCursor::with_handle(self.value(), self.mmap_handle())?)
}
}
impl<'a> HeaderProvider for SnapshotJarProvider<'a> {
fn header(&self, block_hash: &BlockHash) -> RethResult<Option<Header>> {
// WIP
let mut cursor = self.cursor()?;
let header = Header::decompress(
cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0],
)
.unwrap();
if &header.hash_slow() == block_hash {
return Ok(Some(header))
} else {
// check next snapshot
}
Ok(None)
}
fn header_by_number(&self, num: BlockNumber) -> RethResult<Option<Header>> {
Header::decompress(
self.cursor()?
.row_by_number_with_cols::<0b01, 2>(
(num - self.user_header().block_start()) as usize,
)?
.ok_or(ProviderError::HeaderNotFound(num.into()))?[0],
)
.map(Some)
.map_err(Into::into)
}
fn header_td(&self, block_hash: &BlockHash) -> RethResult<Option<U256>> {
// WIP
let mut cursor = NippyJarCursor::with_handle(self.value(), self.mmap_handle())?;
let row = cursor.row_by_key_with_cols::<0b11, 2>(&block_hash.0).unwrap().unwrap();
let header = Header::decompress(row[0]).unwrap();
let td = <HeaderTD as Table>::Value::decompress(row[1]).unwrap();
if &header.hash_slow() == block_hash {
return Ok(Some(td.0))
} else {
// check next snapshot
}
Ok(None)
}
fn header_td_by_number(&self, _number: BlockNumber) -> RethResult<Option<U256>> {
unimplemented!();
}
fn headers_range(&self, _range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
unimplemented!();
}
fn sealed_headers_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
unimplemented!();
}
fn sealed_header(&self, _number: BlockNumber) -> RethResult<Option<SealedHeader>> {
unimplemented!();
}
}
impl<'a> BlockHashReader for SnapshotJarProvider<'a> {
fn block_hash(&self, _number: u64) -> RethResult<Option<B256>> {
todo!()
}
fn canonical_hashes_range(
&self,
_start: BlockNumber,
_end: BlockNumber,
) -> RethResult<Vec<B256>> {
todo!()
}
}
impl<'a> BlockNumReader for SnapshotJarProvider<'a> {
fn chain_info(&self) -> RethResult<ChainInfo> {
todo!()
}
fn best_block_number(&self) -> RethResult<BlockNumber> {
todo!()
}
fn last_block_number(&self) -> RethResult<BlockNumber> {
todo!()
}
fn block_number(&self, _hash: B256) -> RethResult<Option<BlockNumber>> {
todo!()
}
}
impl<'a> TransactionsProvider for SnapshotJarProvider<'a> {
fn transaction_id(&self, _tx_hash: TxHash) -> RethResult<Option<TxNumber>> {
todo!()
}
fn transaction_by_id(&self, num: TxNumber) -> RethResult<Option<TransactionSigned>> {
TransactionSignedNoHash::decompress(
self.cursor()?
.row_by_number_with_cols::<0b1, 1>((num - self.user_header().tx_start()) as usize)?
.ok_or(ProviderError::TransactionNotFound(num.into()))?[0],
)
.map(Into::into)
.map(Some)
.map_err(Into::into)
}
fn transaction_by_id_no_hash(
&self,
_id: TxNumber,
) -> RethResult<Option<TransactionSignedNoHash>> {
todo!()
}
fn transaction_by_hash(&self, hash: TxHash) -> RethResult<Option<TransactionSigned>> {
// WIP
let mut cursor = self.cursor()?;
let tx = TransactionSignedNoHash::decompress(
cursor.row_by_key_with_cols::<0b1, 1>(&hash.0).unwrap().unwrap()[0],
)
.unwrap()
.with_hash();
if tx.hash() == hash {
return Ok(Some(tx))
} else {
// check next snapshot
}
Ok(None)
}
fn transaction_by_hash_with_meta(
&self,
_hash: TxHash,
) -> RethResult<Option<(TransactionSigned, TransactionMeta)>> {
todo!()
}
fn transaction_block(&self, _id: TxNumber) -> RethResult<Option<BlockNumber>> {
todo!()
}
fn transactions_by_block(
&self,
_block_id: BlockHashOrNumber,
) -> RethResult<Option<Vec<TransactionSigned>>> {
todo!()
}
fn transactions_by_block_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>> {
todo!()
}
fn senders_by_tx_range(&self, _range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
todo!()
}
fn transactions_by_tx_range(
&self,
_range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<reth_primitives::TransactionSignedNoHash>> {
todo!()
}
fn transaction_sender(&self, _id: TxNumber) -> RethResult<Option<Address>> {
todo!()
}
}

View File

@ -0,0 +1,176 @@
use super::{LoadedJar, SnapshotJarProvider};
use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider};
use dashmap::DashMap;
use reth_interfaces::RethResult;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::BLOCKS_PER_SNAPSHOT, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo,
Header, SealedHeader, SnapshotSegment, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, TxHash, TxNumber, B256, U256,
};
use std::{ops::RangeBounds, path::PathBuf};
/// SnapshotProvider
#[derive(Debug, Default)]
pub struct SnapshotProvider {
/// Maintains a map which allows for concurrent access to different `NippyJars`, over different
/// segments and ranges.
map: DashMap<(BlockNumber, SnapshotSegment), LoadedJar>,
}
impl SnapshotProvider {
/// Gets the provider of the requested segment and range.
pub fn get_segment_provider(
&self,
segment: SnapshotSegment,
block: BlockNumber,
mut path: Option<PathBuf>,
) -> RethResult<SnapshotJarProvider<'_>> {
// TODO this invalidates custom length snapshots.
let snapshot = block / BLOCKS_PER_SNAPSHOT;
let key = (snapshot, segment);
if let Some(jar) = self.map.get(&key) {
return Ok(jar.into())
}
if let Some(path) = &path {
self.map.insert(key, LoadedJar::new(NippyJar::load(path)?)?);
} else {
path = Some(segment.filename(
&((snapshot * BLOCKS_PER_SNAPSHOT)..=((snapshot + 1) * BLOCKS_PER_SNAPSHOT - 1)),
));
}
self.get_segment_provider(segment, block, path)
}
}
impl HeaderProvider for SnapshotProvider {
fn header(&self, _block_hash: &BlockHash) -> RethResult<Option<Header>> {
todo!()
}
fn header_by_number(&self, num: BlockNumber) -> RethResult<Option<Header>> {
self.get_segment_provider(SnapshotSegment::Headers, num, None)?.header_by_number(num)
}
fn header_td(&self, _block_hash: &BlockHash) -> RethResult<Option<U256>> {
todo!()
}
fn header_td_by_number(&self, _number: BlockNumber) -> RethResult<Option<U256>> {
todo!();
}
fn headers_range(&self, _range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
todo!();
}
fn sealed_headers_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
todo!();
}
fn sealed_header(&self, _number: BlockNumber) -> RethResult<Option<SealedHeader>> {
todo!();
}
}
impl BlockHashReader for SnapshotProvider {
fn block_hash(&self, _number: u64) -> RethResult<Option<B256>> {
todo!()
}
fn canonical_hashes_range(
&self,
_start: BlockNumber,
_end: BlockNumber,
) -> RethResult<Vec<B256>> {
todo!()
}
}
impl BlockNumReader for SnapshotProvider {
fn chain_info(&self) -> RethResult<ChainInfo> {
todo!()
}
fn best_block_number(&self) -> RethResult<BlockNumber> {
todo!()
}
fn last_block_number(&self) -> RethResult<BlockNumber> {
todo!()
}
fn block_number(&self, _hash: B256) -> RethResult<Option<BlockNumber>> {
todo!()
}
}
impl TransactionsProvider for SnapshotProvider {
fn transaction_id(&self, _tx_hash: TxHash) -> RethResult<Option<TxNumber>> {
todo!()
}
fn transaction_by_id(&self, num: TxNumber) -> RethResult<Option<TransactionSigned>> {
// TODO `num` is provided after checking the index
let block_num = num;
self.get_segment_provider(SnapshotSegment::Transactions, block_num, None)?
.transaction_by_id(num)
}
fn transaction_by_id_no_hash(
&self,
_id: TxNumber,
) -> RethResult<Option<TransactionSignedNoHash>> {
todo!()
}
fn transaction_by_hash(&self, _hash: TxHash) -> RethResult<Option<TransactionSigned>> {
todo!()
}
fn transaction_by_hash_with_meta(
&self,
_hash: TxHash,
) -> RethResult<Option<(TransactionSigned, TransactionMeta)>> {
todo!()
}
fn transaction_block(&self, _id: TxNumber) -> RethResult<Option<BlockNumber>> {
todo!()
}
fn transactions_by_block(
&self,
_block_id: BlockHashOrNumber,
) -> RethResult<Option<Vec<TransactionSigned>>> {
todo!()
}
fn transactions_by_block_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>> {
todo!()
}
fn senders_by_tx_range(&self, _range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
todo!()
}
fn transactions_by_tx_range(
&self,
_range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<reth_primitives::TransactionSignedNoHash>> {
todo!()
}
fn transaction_sender(&self, _id: TxNumber) -> RethResult<Option<Address>> {
todo!()
}
}

View File

@ -0,0 +1,162 @@
mod manager;
pub use manager::SnapshotProvider;
mod jar;
pub use jar::SnapshotJarProvider;
use reth_interfaces::RethResult;
use reth_nippy_jar::NippyJar;
use reth_primitives::{snapshot::SegmentHeader, SnapshotSegment};
use std::ops::Deref;
/// Alias type for each specific `NippyJar`.
type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, SnapshotSegment), LoadedJar>;
/// Helper type to reuse an associated snapshot mmap handle on created cursors.
#[derive(Debug)]
pub struct LoadedJar {
jar: NippyJar<SegmentHeader>,
mmap_handle: reth_nippy_jar::MmapHandle,
}
impl LoadedJar {
fn new(jar: NippyJar<SegmentHeader>) -> RethResult<Self> {
let mmap_handle = jar.open_data()?;
Ok(Self { jar, mmap_handle })
}
/// Returns a clone of the mmap handle that can be used to instantiate a cursor.
fn mmap_handle(&self) -> reth_nippy_jar::MmapHandle {
self.mmap_handle.clone()
}
}
impl Deref for LoadedJar {
type Target = NippyJar<SegmentHeader>;
fn deref(&self) -> &Self::Target {
&self.jar
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{HeaderProvider, ProviderFactory};
use rand::{self, seq::SliceRandom};
use reth_db::{
cursor::DbCursorRO,
database::Database,
snapshot::create_snapshot_T1_T2,
test_utils::create_test_rw_db,
transaction::{DbTx, DbTxMut},
CanonicalHeaders, DatabaseError, HeaderNumbers, HeaderTD, Headers, RawTable,
};
use reth_interfaces::test_utils::generators::{self, random_header_range};
use reth_nippy_jar::NippyJar;
use reth_primitives::{BlockNumber, B256, MAINNET, U256};
#[test]
fn test_snap() {
// Ranges
let row_count = 100u64;
let range = 0..=(row_count - 1);
let segment_header = SegmentHeader::new(range.clone(), range.clone());
// Data sources
let db = create_test_rw_db();
let factory = ProviderFactory::new(&db, MAINNET.clone());
let snap_file = tempfile::NamedTempFile::new().unwrap();
// Setup data
let mut headers = random_header_range(
&mut generators::rng(),
*range.start()..(*range.end() + 1),
B256::random(),
);
db.update(|tx| -> Result<(), DatabaseError> {
let mut td = U256::ZERO;
for header in headers.clone() {
td += header.header.difficulty;
let hash = header.hash();
tx.put::<CanonicalHeaders>(header.number, hash)?;
tx.put::<Headers>(header.number, header.clone().unseal())?;
tx.put::<HeaderTD>(header.number, td.into())?;
tx.put::<HeaderNumbers>(hash, header.number)?;
}
Ok(())
})
.unwrap()
.unwrap();
// Create Snapshot
{
let with_compression = true;
let with_filter = true;
let mut nippy_jar = NippyJar::new(2, snap_file.path(), segment_header);
if with_compression {
nippy_jar = nippy_jar.with_zstd(false, 0);
}
if with_filter {
nippy_jar = nippy_jar.with_cuckoo_filter(row_count as usize + 10).with_fmph();
}
let tx = db.tx().unwrap();
// Hacky type inference. TODO fix
let mut none_vec = Some(vec![vec![vec![0u8]].into_iter()]);
let _ = none_vec.take();
// Generate list of hashes for filters & PHF
let mut cursor = tx.cursor_read::<RawTable<CanonicalHeaders>>().unwrap();
let hashes = cursor
.walk(None)
.unwrap()
.map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into()));
create_snapshot_T1_T2::<Headers, HeaderTD, BlockNumber, SegmentHeader>(
&tx,
range,
None,
none_vec,
Some(hashes),
row_count as usize,
&mut nippy_jar,
)
.unwrap();
}
// Use providers to query Header data and compare if it matches
{
let db_provider = factory.provider().unwrap();
let manager = SnapshotProvider::default();
let jar_provider = manager
.get_segment_provider(SnapshotSegment::Headers, 0, Some(snap_file.path().into()))
.unwrap();
assert!(!headers.is_empty());
// Shuffled for chaos.
headers.shuffle(&mut generators::rng());
for header in headers {
let header_hash = header.hash();
let header = header.unseal();
// Compare Header
assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap());
assert_eq!(header, jar_provider.header(&header_hash).unwrap().unwrap());
// Compare HeaderTD
assert_eq!(
db_provider.header_td(&header_hash).unwrap().unwrap(),
jar_provider.header_td(&header_hash).unwrap().unwrap()
);
}
}
}
}