feat: add SnapshotCursor wrapper and impl HeaderProvider (#5170)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
joshieDo
2023-10-30 11:58:03 +00:00
committed by GitHub
parent d51bc45bad
commit 74a2bf38dd
6 changed files with 233 additions and 72 deletions

View File

@ -1,13 +1,15 @@
//! reth's snapshot creation from database tables
//! reth's snapshot creation from database tables and access
use crate::{
abstraction::cursor::DbCursorRO,
table::{Key, Table},
table::{Decompress, Key, Table},
transaction::DbTx,
RawKey, RawTable,
};
use reth_interfaces::RethResult;
use reth_nippy_jar::{ColumnResult, NippyJar, PHFKey};
use derive_more::{Deref, DerefMut};
use reth_interfaces::{RethError, RethResult};
use reth_nippy_jar::{ColumnResult, MmapHandle, NippyJar, NippyJarCursor, PHFKey};
use reth_primitives::{snapshot::SegmentHeader, B256};
use reth_tracing::tracing::*;
use serde::{Deserialize, Serialize};
use std::{error::Error as StdError, ops::RangeInclusive};
@ -102,3 +104,117 @@ macro_rules! generate_snapshot_func {
}
generate_snapshot_func!((T1), (T1, T2), (T1, T2, T3), (T1, T2, T3, T4), (T1, T2, T3, T4, T5),);
/// Cursor of a snapshot segment.
#[derive(Debug, Deref, DerefMut)]
pub struct SnapshotCursor<'a>(NippyJarCursor<'a, SegmentHeader>);
impl<'a> SnapshotCursor<'a> {
/// Returns a new [`SnapshotCursor`].
pub fn new(
jar: &'a NippyJar<SegmentHeader>,
mmap_handle: MmapHandle,
) -> Result<Self, RethError> {
Ok(Self(NippyJarCursor::with_handle(jar, mmap_handle)?))
}
/// Gets a row of values.
pub fn get<const SELECTOR: usize, const COLUMNS: usize>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<Vec<&'_ [u8]>>> {
let row = match key_or_num {
KeyOrNumber::Hash(k) => self.row_by_key_with_cols::<SELECTOR, COLUMNS>(k),
KeyOrNumber::Number(n) => {
let offset = self.jar().user_header().start();
if offset > n {
return Ok(None)
}
self.row_by_number_with_cols::<SELECTOR, COLUMNS>((n - offset) as usize)
}
}?;
Ok(row)
}
/// Gets one column value from a row.
pub fn get_one<T: Decompress, const SELECTOR: usize, const COLUMNS: usize>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<T>> {
let row = self.get::<SELECTOR, COLUMNS>(key_or_num)?;
match row {
Some(row) => Ok(Some(T::decompress(row[0])?)),
None => Ok(None),
}
}
/// Gets two column values from a row.
pub fn get_two<T: Decompress, K: Decompress, const SELECTOR: usize, const COLUMNS: usize>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<(T, K)>> {
let row = self.get::<SELECTOR, COLUMNS>(key_or_num)?;
match row {
Some(row) => Ok(Some((T::decompress(row[0])?, K::decompress(row[1])?))),
None => Ok(None),
}
}
/// Gets three column values from a row.
pub fn get_three<
T: Decompress,
K: Decompress,
J: Decompress,
const SELECTOR: usize,
const COLUMNS: usize,
>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<(T, K, J)>> {
let row = self.get::<SELECTOR, COLUMNS>(key_or_num)?;
match row {
Some(row) => {
Ok(Some((T::decompress(row[0])?, K::decompress(row[1])?, J::decompress(row[2])?)))
}
None => Ok(None),
}
}
}
/// Either a key _or_ a block number
#[derive(Debug)]
pub enum KeyOrNumber<'a> {
/// A slice used as a key. Usually a block hash
Hash(&'a [u8]),
/// A block number
Number(u64),
}
impl<'a> From<&'a B256> for KeyOrNumber<'a> {
fn from(value: &'a B256) -> Self {
KeyOrNumber::Hash(value.as_slice())
}
}
impl<'a> From<u64> for KeyOrNumber<'a> {
fn from(value: u64) -> Self {
KeyOrNumber::Number(value)
}
}
/// Snapshot segment total columns.
pub const HEADER_COLUMNS: usize = 3;
/// Selector for header.
pub const S_HEADER: usize = 0b001;
/// Selector for header td.
pub const S_HEADER_TD: usize = 0b010;
/// Selector for header hash.
pub const S_HEADER_HASH: usize = 0b100;
/// Selector for header td and header hash.
pub const S_HEADER_TD_WITH_HASH: usize = 0b110;
/// Selector for header and header hash.
pub const S_HEADER_WITH_HASH: usize = 0b101;

View File

@ -58,6 +58,10 @@ where
})
}
pub fn jar(&self) -> &NippyJar<H> {
self.jar
}
/// Resets cursor to the beginning.
pub fn reset(&mut self) {
self.row = 0;

View File

@ -1,17 +1,19 @@
use super::LoadedJarRef;
use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider};
use reth_db::{
snapshot::{
SnapshotCursor, HEADER_COLUMNS, S_HEADER, S_HEADER_HASH, S_HEADER_TD,
S_HEADER_TD_WITH_HASH, S_HEADER_WITH_HASH,
},
table::{Decompress, Table},
HeaderTD,
CanonicalHeaders, HeaderTD,
};
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_nippy_jar::NippyJarCursor;
use reth_primitives::{
snapshot::SegmentHeader, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header,
SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber,
B256, U256,
Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader,
TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, B256, U256,
};
use std::ops::{Deref, RangeBounds};
use std::ops::{Deref, Range, RangeBounds};
/// Provider over a specific `NippyJar` and range.
#[derive(Debug)]
@ -32,84 +34,93 @@ impl<'a> From<LoadedJarRef<'a>> for SnapshotJarProvider<'a> {
impl<'a> SnapshotJarProvider<'a> {
/// Provides a cursor for more granular data access.
pub fn cursor<'b>(&'b self) -> RethResult<NippyJarCursor<'a, SegmentHeader>>
pub fn cursor<'b>(&'b self) -> RethResult<SnapshotCursor<'a>>
where
'b: 'a,
{
Ok(NippyJarCursor::with_handle(self.value(), self.mmap_handle())?)
SnapshotCursor::new(self.value(), self.mmap_handle())
}
}
impl<'a> HeaderProvider for SnapshotJarProvider<'a> {
fn header(&self, block_hash: &BlockHash) -> RethResult<Option<Header>> {
// WIP
let mut cursor = self.cursor()?;
let header = Header::decompress(
cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0],
)
.unwrap();
if &header.hash_slow() == block_hash {
return Ok(Some(header))
} else {
// check next snapshot
}
Ok(None)
Ok(self
.cursor()?
.get_two::<Header, <CanonicalHeaders as Table>::Value, S_HEADER_WITH_HASH, HEADER_COLUMNS>(block_hash.into())?
.filter(|(_, hash)| hash == block_hash)
.map(|(header, _)| header))
}
fn header_by_number(&self, num: BlockNumber) -> RethResult<Option<Header>> {
Header::decompress(
self.cursor()?
.row_by_number_with_cols::<0b01, 2>(
(num - self.user_header().block_start()) as usize,
)?
.ok_or(ProviderError::HeaderNotFound(num.into()))?[0],
)
.map(Some)
.map_err(Into::into)
self.cursor()?.get_one::<Header, S_HEADER, HEADER_COLUMNS>(num.into())
}
fn header_td(&self, block_hash: &BlockHash) -> RethResult<Option<U256>> {
// WIP
let mut cursor = NippyJarCursor::with_handle(self.value(), self.mmap_handle())?;
Ok(self
.cursor()?
.get_two::<<HeaderTD as Table>::Value, <CanonicalHeaders as Table>::Value, S_HEADER_TD_WITH_HASH, HEADER_COLUMNS>(
block_hash.into(),
)?
.filter(|(_, hash)| hash == block_hash)
.map(|(td, _)| td.into()))
}
let row = cursor.row_by_key_with_cols::<0b11, 2>(&block_hash.0).unwrap().unwrap();
fn header_td_by_number(&self, num: BlockNumber) -> RethResult<Option<U256>> {
Ok(self
.cursor()?
.get_one::<<HeaderTD as Table>::Value, S_HEADER_TD, HEADER_COLUMNS>(num.into())?
.map(Into::into))
}
let header = Header::decompress(row[0]).unwrap();
let td = <HeaderTD as Table>::Value::decompress(row[1]).unwrap();
fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
let range = to_range(range);
if &header.hash_slow() == block_hash {
return Ok(Some(td.0))
} else {
// check next snapshot
let mut cursor = self.cursor()?;
let mut headers = Vec::with_capacity((range.end - range.start) as usize);
for num in range.start..range.end {
match cursor.get_one::<Header, S_HEADER, HEADER_COLUMNS>(num.into())? {
Some(header) => headers.push(header),
None => return Ok(headers),
}
}
Ok(None)
}
fn header_td_by_number(&self, _number: BlockNumber) -> RethResult<Option<U256>> {
unimplemented!();
}
fn headers_range(&self, _range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
unimplemented!();
Ok(headers)
}
fn sealed_headers_range(
&self,
_range: impl RangeBounds<BlockNumber>,
range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
unimplemented!();
let range = to_range(range);
let mut cursor = self.cursor()?;
let mut headers = Vec::with_capacity((range.end - range.start) as usize);
for number in range.start..range.end {
match cursor
.get_two::<Header, <CanonicalHeaders as Table>::Value, S_HEADER_WITH_HASH, HEADER_COLUMNS>(number.into())?
{
Some((header, hash)) => headers.push(header.seal(hash)),
None => return Ok(headers),
}
}
Ok(headers)
}
fn sealed_header(&self, _number: BlockNumber) -> RethResult<Option<SealedHeader>> {
unimplemented!();
fn sealed_header(&self, number: BlockNumber) -> RethResult<Option<SealedHeader>> {
Ok(self
.cursor()?
.get_two::<Header, <CanonicalHeaders as Table>::Value, S_HEADER_WITH_HASH, HEADER_COLUMNS>(number.into())?
.map(|(header, hash)| header.seal(hash)))
}
}
impl<'a> BlockHashReader for SnapshotJarProvider<'a> {
fn block_hash(&self, _number: u64) -> RethResult<Option<B256>> {
todo!()
fn block_hash(&self, number: u64) -> RethResult<Option<B256>> {
self.cursor()?.get_one::<<CanonicalHeaders as Table>::Value, S_HEADER_HASH, HEADER_COLUMNS>(
number.into(),
)
}
fn canonical_hashes_range(
@ -148,7 +159,7 @@ impl<'a> TransactionsProvider for SnapshotJarProvider<'a> {
TransactionSignedNoHash::decompress(
self.cursor()?
.row_by_number_with_cols::<0b1, 1>((num - self.user_header().tx_start()) as usize)?
.ok_or(ProviderError::TransactionNotFound(num.into()))?[0],
.ok_or_else(|| ProviderError::TransactionNotFound(num.into()))?[0],
)
.map(Into::into)
.map(Some)
@ -220,3 +231,19 @@ impl<'a> TransactionsProvider for SnapshotJarProvider<'a> {
todo!()
}
}
fn to_range<R: RangeBounds<u64>>(bounds: R) -> Range<u64> {
let start = match bounds.start_bound() {
std::ops::Bound::Included(&v) => v,
std::ops::Bound::Excluded(&v) => v + 1,
std::ops::Bound::Unbounded => 0,
};
let end = match bounds.end_bound() {
std::ops::Bound::Included(&v) => v + 1,
std::ops::Bound::Excluded(&v) => v,
std::ops::Bound::Unbounded => u64::MAX,
};
start..end
}

View File

@ -46,7 +46,7 @@ mod test {
use reth_db::{
cursor::DbCursorRO,
database::Database,
snapshot::create_snapshot_T1_T2,
snapshot::create_snapshot_T1_T2_T3,
test_utils::create_test_rw_db,
transaction::{DbTx, DbTxMut},
CanonicalHeaders, DatabaseError, HeaderNumbers, HeaderTD, Headers, RawTable,
@ -60,7 +60,8 @@ mod test {
// Ranges
let row_count = 100u64;
let range = 0..=(row_count - 1);
let segment_header = SegmentHeader::new(range.clone(), range.clone());
let segment_header =
SegmentHeader::new(range.clone(), range.clone(), SnapshotSegment::Headers);
// Data sources
let db = create_test_rw_db();
@ -95,7 +96,7 @@ mod test {
let with_compression = true;
let with_filter = true;
let mut nippy_jar = NippyJar::new(2, snap_file.path(), segment_header);
let mut nippy_jar = NippyJar::new(3, snap_file.path(), segment_header);
if with_compression {
nippy_jar = nippy_jar.with_zstd(false, 0);
@ -118,14 +119,14 @@ mod test {
.unwrap()
.map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into()));
create_snapshot_T1_T2::<Headers, HeaderTD, BlockNumber, SegmentHeader>(
&tx,
range,
None,
none_vec,
Some(hashes),
row_count as usize,
&mut nippy_jar,
create_snapshot_T1_T2_T3::<
Headers,
HeaderTD,
CanonicalHeaders,
BlockNumber,
SegmentHeader,
>(
&tx, range, None, none_vec, Some(hashes), row_count as usize, &mut nippy_jar
)
.unwrap();
}