feat: remove const generics from cursors and add segment masks (#5181)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
joshieDo
2023-10-30 12:24:49 +00:00
committed by GitHub
parent 74a2bf38dd
commit e73ece945d
11 changed files with 313 additions and 238 deletions

View File

@ -0,0 +1,105 @@
use super::mask::{ColumnSelectorOne, ColumnSelectorThree, ColumnSelectorTwo};
use crate::table::Decompress;
use derive_more::{Deref, DerefMut};
use reth_interfaces::{RethError, RethResult};
use reth_nippy_jar::{MmapHandle, NippyJar, NippyJarCursor};
use reth_primitives::{snapshot::SegmentHeader, B256};
/// Cursor of a snapshot segment.
#[derive(Debug, Deref, DerefMut)]
pub struct SnapshotCursor<'a>(NippyJarCursor<'a, SegmentHeader>);
impl<'a> SnapshotCursor<'a> {
/// Returns a new [`SnapshotCursor`].
pub fn new(
jar: &'a NippyJar<SegmentHeader>,
mmap_handle: MmapHandle,
) -> Result<Self, RethError> {
Ok(Self(NippyJarCursor::with_handle(jar, mmap_handle)?))
}
/// Gets a row of values.
pub fn get(
&mut self,
key_or_num: KeyOrNumber<'_>,
mask: usize,
) -> RethResult<Option<Vec<&'_ [u8]>>> {
let row = match key_or_num {
KeyOrNumber::Key(k) => self.row_by_key_with_cols(k, mask),
KeyOrNumber::Number(n) => {
let offset = self.jar().user_header().start();
if offset > n {
return Ok(None)
}
self.row_by_number_with_cols((n - offset) as usize, mask)
}
}?;
Ok(row)
}
/// Gets one column value from a row.
pub fn get_one<M: ColumnSelectorOne>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<M::FIRST>> {
let row = self.get(key_or_num, M::MASK)?;
match row {
Some(row) => Ok(Some(M::FIRST::decompress(row[0])?)),
None => Ok(None),
}
}
/// Gets two column values from a row.
pub fn get_two<M: ColumnSelectorTwo>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<(M::FIRST, M::SECOND)>> {
let row = self.get(key_or_num, M::MASK)?;
match row {
Some(row) => Ok(Some((M::FIRST::decompress(row[0])?, M::SECOND::decompress(row[1])?))),
None => Ok(None),
}
}
/// Gets three column values from a row.
#[allow(clippy::type_complexity)]
pub fn get_three<M: ColumnSelectorThree>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<(M::FIRST, M::SECOND, M::THIRD)>> {
let row = self.get(key_or_num, M::MASK)?;
match row {
Some(row) => Ok(Some((
M::FIRST::decompress(row[0])?,
M::SECOND::decompress(row[1])?,
M::THIRD::decompress(row[2])?,
))),
None => Ok(None),
}
}
}
/// Either a key _or_ a block/tx number
#[derive(Debug)]
pub enum KeyOrNumber<'a> {
/// A slice used as a key. Usually a block/tx hash
Key(&'a [u8]),
/// A block/tx number
Number(u64),
}
impl<'a> From<&'a B256> for KeyOrNumber<'a> {
fn from(value: &'a B256) -> Self {
KeyOrNumber::Key(value.as_slice())
}
}
impl<'a> From<u64> for KeyOrNumber<'a> {
fn from(value: u64) -> Self {
KeyOrNumber::Number(value)
}
}

View File

@ -1,15 +1,13 @@
//! reth's snapshot creation from database tables and access
use crate::{
abstraction::cursor::DbCursorRO,
table::{Decompress, Key, Table},
table::{Key, Table},
transaction::DbTx,
RawKey, RawTable,
};
use derive_more::{Deref, DerefMut};
use reth_interfaces::{RethError, RethResult};
use reth_nippy_jar::{ColumnResult, MmapHandle, NippyJar, NippyJarCursor, PHFKey};
use reth_primitives::{snapshot::SegmentHeader, B256};
use reth_interfaces::RethResult;
use reth_nippy_jar::{ColumnResult, NippyJar, PHFKey};
use reth_tracing::tracing::*;
use serde::{Deserialize, Serialize};
use std::{error::Error as StdError, ops::RangeInclusive};
@ -104,117 +102,3 @@ macro_rules! generate_snapshot_func {
}
generate_snapshot_func!((T1), (T1, T2), (T1, T2, T3), (T1, T2, T3, T4), (T1, T2, T3, T4, T5),);
/// Cursor of a snapshot segment.
#[derive(Debug, Deref, DerefMut)]
pub struct SnapshotCursor<'a>(NippyJarCursor<'a, SegmentHeader>);
impl<'a> SnapshotCursor<'a> {
/// Returns a new [`SnapshotCursor`].
pub fn new(
jar: &'a NippyJar<SegmentHeader>,
mmap_handle: MmapHandle,
) -> Result<Self, RethError> {
Ok(Self(NippyJarCursor::with_handle(jar, mmap_handle)?))
}
/// Gets a row of values.
pub fn get<const SELECTOR: usize, const COLUMNS: usize>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<Vec<&'_ [u8]>>> {
let row = match key_or_num {
KeyOrNumber::Hash(k) => self.row_by_key_with_cols::<SELECTOR, COLUMNS>(k),
KeyOrNumber::Number(n) => {
let offset = self.jar().user_header().start();
if offset > n {
return Ok(None)
}
self.row_by_number_with_cols::<SELECTOR, COLUMNS>((n - offset) as usize)
}
}?;
Ok(row)
}
/// Gets one column value from a row.
pub fn get_one<T: Decompress, const SELECTOR: usize, const COLUMNS: usize>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<T>> {
let row = self.get::<SELECTOR, COLUMNS>(key_or_num)?;
match row {
Some(row) => Ok(Some(T::decompress(row[0])?)),
None => Ok(None),
}
}
/// Gets two column values from a row.
pub fn get_two<T: Decompress, K: Decompress, const SELECTOR: usize, const COLUMNS: usize>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<(T, K)>> {
let row = self.get::<SELECTOR, COLUMNS>(key_or_num)?;
match row {
Some(row) => Ok(Some((T::decompress(row[0])?, K::decompress(row[1])?))),
None => Ok(None),
}
}
/// Gets three column values from a row.
pub fn get_three<
T: Decompress,
K: Decompress,
J: Decompress,
const SELECTOR: usize,
const COLUMNS: usize,
>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<(T, K, J)>> {
let row = self.get::<SELECTOR, COLUMNS>(key_or_num)?;
match row {
Some(row) => {
Ok(Some((T::decompress(row[0])?, K::decompress(row[1])?, J::decompress(row[2])?)))
}
None => Ok(None),
}
}
}
/// Either a key _or_ a block number
#[derive(Debug)]
pub enum KeyOrNumber<'a> {
/// A slice used as a key. Usually a block hash
Hash(&'a [u8]),
/// A block number
Number(u64),
}
impl<'a> From<&'a B256> for KeyOrNumber<'a> {
fn from(value: &'a B256) -> Self {
KeyOrNumber::Hash(value.as_slice())
}
}
impl<'a> From<u64> for KeyOrNumber<'a> {
fn from(value: u64) -> Self {
KeyOrNumber::Number(value)
}
}
/// Snapshot segment total columns.
pub const HEADER_COLUMNS: usize = 3;
/// Selector for header.
pub const S_HEADER: usize = 0b001;
/// Selector for header td.
pub const S_HEADER_TD: usize = 0b010;
/// Selector for header hash.
pub const S_HEADER_HASH: usize = 0b100;
/// Selector for header td and header hash.
pub const S_HEADER_TD_WITH_HASH: usize = 0b110;
/// Selector for header and header hash.
pub const S_HEADER_WITH_HASH: usize = 0b101;

View File

@ -0,0 +1,90 @@
use crate::table::Decompress;
/// Generic Mask helper struct for selecting specific column values to read and decompress.
///
/// #### Explanation:
///
/// A `NippyJar` snapshot row can contain multiple column values. To specify the column values
/// to be read, a mask is utilized.
///
/// For example, a snapshot with three columns, if the first and last columns are queried, the mask
/// `0b101` would be passed. To select only the second column, the mask `0b010` would be used.
///
/// Since each snapshot has its own column distribution, different wrapper types are necessary. For
/// instance, `B256` might be the third column in the `Header` segment, while being the second
/// column in another segment. Hence, `Mask<B256>` would only be applicable to one of these
/// scenarios.
///
/// Alongside, the column selector traits (eg. [`ColumnSelectorOne`]) this provides a structured way
/// to tie the types to be decoded to the mask necessary to query them.
#[derive(Debug)]
pub struct Mask<FIRST, SECOND = (), THIRD = ()>(std::marker::PhantomData<(FIRST, SECOND, THIRD)>);
macro_rules! add_segments {
($($segment:tt),+) => {
paste::paste! {
$(
#[doc = concat!("Mask for ", stringify!($segment), " snapshot segment. See [`Mask`] for more.")]
#[derive(Debug)]
pub struct [<$segment Mask>]<FIRST, SECOND = (), THIRD = ()>(Mask<FIRST, SECOND, THIRD>);
)+
}
};
}
add_segments!(Header, Receipt, Transaction);
/// Trait for specifying a mask to select one column value.
pub trait ColumnSelectorOne {
/// First desired column value
type FIRST: Decompress;
/// Mask to obtain desired values, should correspond to the order of columns in a snapshot.
const MASK: usize;
}
/// Trait for specifying a mask to select two column values.
pub trait ColumnSelectorTwo {
/// First desired column value
type FIRST: Decompress;
/// Second desired column value
type SECOND: Decompress;
/// Mask to obtain desired values, should correspond to the order of columns in a snapshot.
const MASK: usize;
}
/// Trait for specifying a mask to select three column values.
pub trait ColumnSelectorThree {
/// First desired column value
type FIRST: Decompress;
/// Second desired column value
type SECOND: Decompress;
/// Third desired column value
type THIRD: Decompress;
/// Mask to obtain desired values, should correspond to the order of columns in a snapshot.
const MASK: usize;
}
#[macro_export]
/// Add mask to select `N` column values from a specific snapshot segment row.
macro_rules! add_snapshot_mask {
($mask_struct:tt, $type1:ty, $mask:expr) => {
impl ColumnSelectorOne for $mask_struct<$type1> {
type FIRST = $type1;
const MASK: usize = $mask;
}
};
($mask_struct:tt, $type1:ty, $type2:ty, $mask:expr) => {
impl ColumnSelectorTwo for $mask_struct<$type1, $type2> {
type FIRST = $type1;
type SECOND = $type2;
const MASK: usize = $mask;
}
};
($mask_struct:tt, $type1:ty, $type2:ty, $type3:ty, $mask:expr) => {
impl ColumnSelectorTwo for $mask_struct<$type1, $type2, $type3> {
type FIRST = $type1;
type SECOND = $type2;
type THIRD = $type3;
const MASK: usize = $mask;
}
};
}

View File

@ -0,0 +1,28 @@
use super::{ReceiptMask, TransactionMask};
use crate::{
add_snapshot_mask,
snapshot::mask::{ColumnSelectorOne, ColumnSelectorTwo, HeaderMask},
table::Table,
CanonicalHeaders, HeaderTD, Receipts, Transactions,
};
use reth_primitives::{BlockHash, Header};
// HEADER MASKS
add_snapshot_mask!(HeaderMask, Header, 0b001);
add_snapshot_mask!(HeaderMask, <HeaderTD as Table>::Value, 0b010);
add_snapshot_mask!(HeaderMask, BlockHash, 0b100);
add_snapshot_mask!(HeaderMask, Header, BlockHash, 0b101);
add_snapshot_mask!(
HeaderMask,
<HeaderTD as Table>::Value,
<CanonicalHeaders as Table>::Value,
0b110
);
// RECEIPT MASKS
add_snapshot_mask!(ReceiptMask, <Receipts as Table>::Value, 0b1);
// TRANSACTION MASKS
add_snapshot_mask!(TransactionMask, <Transactions as Table>::Value, 0b1);

View File

@ -0,0 +1,12 @@
//! reth's snapshot database table import and access
mod generation;
pub use generation::*;
mod cursor;
pub use cursor::SnapshotCursor;
mod mask;
pub use mask::*;
mod masks;

View File

@ -131,15 +131,16 @@ where
}
/// Returns a row, searching it by a key used during [`NippyJar::prepare_index`] by using a
/// `MASK` to only read certain columns from the row.
/// `mask` to only read certain columns from the row.
///
/// **May return false positives.**
///
/// Example usage would be querying a transactions file with a transaction hash which is **NOT**
/// stored in file.
pub fn row_by_key_with_cols<const MASK: usize, const COLUMNS: usize>(
pub fn row_by_key_with_cols(
&mut self,
key: &[u8],
mask: usize,
) -> Result<Option<RefRow<'_>>, NippyJarError> {
if let (Some(filter), Some(phf)) = (&self.jar.filter, &self.jar.phf) {
// TODO: is it worth to parallize both?
@ -153,7 +154,7 @@ where
.offsets_index
.access(row_index as usize)
.expect("built from same set") as u64;
return self.next_row_with_cols::<MASK, COLUMNS>()
return self.next_row_with_cols(mask)
}
}
} else {
@ -163,21 +164,20 @@ where
Ok(None)
}
/// Returns a row by its number by using a `MASK` to only read certain columns from the row.
pub fn row_by_number_with_cols<const MASK: usize, const COLUMNS: usize>(
/// Returns a row by its number by using a `mask` to only read certain columns from the row.
pub fn row_by_number_with_cols(
&mut self,
row: usize,
mask: usize,
) -> Result<Option<RefRow<'_>>, NippyJarError> {
self.row = row as u64;
self.next_row_with_cols::<MASK, COLUMNS>()
self.next_row_with_cols(mask)
}
/// Returns the current value and advances the row.
///
/// Uses a `MASK` to only read certain columns from the row.
pub fn next_row_with_cols<const MASK: usize, const COLUMNS: usize>(
&mut self,
) -> Result<Option<RefRow<'_>>, NippyJarError> {
/// Uses a `mask` to only read certain columns from the row.
pub fn next_row_with_cols(&mut self, mask: usize) -> Result<Option<RefRow<'_>>, NippyJarError> {
self.internal_buffer.clear();
if self.row as usize * self.jar.columns >= self.jar.offsets.len() {
@ -185,10 +185,11 @@ where
return Ok(None)
}
let mut row = Vec::with_capacity(COLUMNS);
let columns = self.jar.columns;
let mut row = Vec::with_capacity(columns);
for column in 0..COLUMNS {
if MASK & (1 << column) != 0 {
for column in 0..columns {
if mask & (1 << column) != 0 {
self.read_value(column, &mut row)?
}
}

View File

@ -903,14 +903,13 @@ mod tests {
// Imagine `Blocks` snapshot file has two columns: `Block | StoredWithdrawals`
const BLOCKS_FULL_MASK: usize = 0b11;
const BLOCKS_COLUMNS: usize = 2;
// Read both columns
for (row_num, (v0, v1)) in &data {
// Simulates `by_hash` queries by iterating col1 values, which were used to
// create the inner index.
let row_by_value = cursor
.row_by_key_with_cols::<BLOCKS_FULL_MASK, BLOCKS_COLUMNS>(v0)
.row_by_key_with_cols(v0, BLOCKS_FULL_MASK)
.unwrap()
.unwrap()
.iter()
@ -920,7 +919,7 @@ mod tests {
// Simulates `by_number` queries
let row_by_num = cursor
.row_by_number_with_cols::<BLOCKS_FULL_MASK, BLOCKS_COLUMNS>(*row_num)
.row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
.unwrap()
.unwrap();
assert_eq!(row_by_value, row_by_num);
@ -932,7 +931,7 @@ mod tests {
// Simulates `by_hash` queries by iterating col1 values, which were used to
// create the inner index.
let row_by_value = cursor
.row_by_key_with_cols::<BLOCKS_BLOCK_MASK, BLOCKS_COLUMNS>(v0)
.row_by_key_with_cols(v0, BLOCKS_BLOCK_MASK)
.unwrap()
.unwrap()
.iter()
@ -943,7 +942,7 @@ mod tests {
// Simulates `by_number` queries
let row_by_num = cursor
.row_by_number_with_cols::<BLOCKS_BLOCK_MASK, BLOCKS_COLUMNS>(*row_num)
.row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
.unwrap()
.unwrap();
assert_eq!(row_by_num.len(), 1);
@ -956,7 +955,7 @@ mod tests {
// Simulates `by_hash` queries by iterating col1 values, which were used to
// create the inner index.
let row_by_value = cursor
.row_by_key_with_cols::<BLOCKS_WITHDRAWAL_MASK, BLOCKS_COLUMNS>(v0)
.row_by_key_with_cols(v0, BLOCKS_WITHDRAWAL_MASK)
.unwrap()
.unwrap()
.iter()
@ -967,7 +966,7 @@ mod tests {
// Simulates `by_number` queries
let row_by_num = cursor
.row_by_number_with_cols::<BLOCKS_WITHDRAWAL_MASK, BLOCKS_COLUMNS>(*row_num)
.row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
.unwrap()
.unwrap();
assert_eq!(row_by_num.len(), 1);
@ -980,14 +979,14 @@ mod tests {
// Simulates `by_hash` queries by iterating col1 values, which were used to
// create the inner index.
assert!(cursor
.row_by_key_with_cols::<BLOCKS_EMPTY_MASK, BLOCKS_COLUMNS>(v0)
.row_by_key_with_cols(v0, BLOCKS_EMPTY_MASK)
.unwrap()
.unwrap()
.is_empty());
// Simulates `by_number` queries
assert!(cursor
.row_by_number_with_cols::<BLOCKS_EMPTY_MASK, BLOCKS_COLUMNS>(*row_num)
.row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
.unwrap()
.unwrap()
.is_empty());

View File

@ -1,12 +1,9 @@
use super::LoadedJarRef;
use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider};
use reth_db::{
snapshot::{
SnapshotCursor, HEADER_COLUMNS, S_HEADER, S_HEADER_HASH, S_HEADER_TD,
S_HEADER_TD_WITH_HASH, S_HEADER_WITH_HASH,
},
table::{Decompress, Table},
CanonicalHeaders, HeaderTD,
codecs::CompactU256,
snapshot::{HeaderMask, SnapshotCursor},
table::Decompress,
};
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_primitives::{
@ -46,30 +43,25 @@ impl<'a> HeaderProvider for SnapshotJarProvider<'a> {
fn header(&self, block_hash: &BlockHash) -> RethResult<Option<Header>> {
Ok(self
.cursor()?
.get_two::<Header, <CanonicalHeaders as Table>::Value, S_HEADER_WITH_HASH, HEADER_COLUMNS>(block_hash.into())?
.get_two::<HeaderMask<Header, BlockHash>>(block_hash.into())?
.filter(|(_, hash)| hash == block_hash)
.map(|(header, _)| header))
}
fn header_by_number(&self, num: BlockNumber) -> RethResult<Option<Header>> {
self.cursor()?.get_one::<Header, S_HEADER, HEADER_COLUMNS>(num.into())
self.cursor()?.get_one::<HeaderMask<Header>>(num.into())
}
fn header_td(&self, block_hash: &BlockHash) -> RethResult<Option<U256>> {
Ok(self
.cursor()?
.get_two::<<HeaderTD as Table>::Value, <CanonicalHeaders as Table>::Value, S_HEADER_TD_WITH_HASH, HEADER_COLUMNS>(
block_hash.into(),
)?
.get_two::<HeaderMask<CompactU256, BlockHash>>(block_hash.into())?
.filter(|(_, hash)| hash == block_hash)
.map(|(td, _)| td.into()))
}
fn header_td_by_number(&self, num: BlockNumber) -> RethResult<Option<U256>> {
Ok(self
.cursor()?
.get_one::<<HeaderTD as Table>::Value, S_HEADER_TD, HEADER_COLUMNS>(num.into())?
.map(Into::into))
Ok(self.cursor()?.get_one::<HeaderMask<CompactU256>>(num.into())?.map(Into::into))
}
fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
@ -79,7 +71,7 @@ impl<'a> HeaderProvider for SnapshotJarProvider<'a> {
let mut headers = Vec::with_capacity((range.end - range.start) as usize);
for num in range.start..range.end {
match cursor.get_one::<Header, S_HEADER, HEADER_COLUMNS>(num.into())? {
match cursor.get_one::<HeaderMask<Header>>(num.into())? {
Some(header) => headers.push(header),
None => return Ok(headers),
}
@ -98,9 +90,7 @@ impl<'a> HeaderProvider for SnapshotJarProvider<'a> {
let mut headers = Vec::with_capacity((range.end - range.start) as usize);
for number in range.start..range.end {
match cursor
.get_two::<Header, <CanonicalHeaders as Table>::Value, S_HEADER_WITH_HASH, HEADER_COLUMNS>(number.into())?
{
match cursor.get_two::<HeaderMask<Header, BlockHash>>(number.into())? {
Some((header, hash)) => headers.push(header.seal(hash)),
None => return Ok(headers),
}
@ -111,16 +101,14 @@ impl<'a> HeaderProvider for SnapshotJarProvider<'a> {
fn sealed_header(&self, number: BlockNumber) -> RethResult<Option<SealedHeader>> {
Ok(self
.cursor()?
.get_two::<Header, <CanonicalHeaders as Table>::Value, S_HEADER_WITH_HASH, HEADER_COLUMNS>(number.into())?
.get_two::<HeaderMask<Header, BlockHash>>(number.into())?
.map(|(header, hash)| header.seal(hash)))
}
}
impl<'a> BlockHashReader for SnapshotJarProvider<'a> {
fn block_hash(&self, number: u64) -> RethResult<Option<B256>> {
self.cursor()?.get_one::<<CanonicalHeaders as Table>::Value, S_HEADER_HASH, HEADER_COLUMNS>(
number.into(),
)
self.cursor()?.get_one::<HeaderMask<BlockHash>>(number.into())
}
fn canonical_hashes_range(
@ -158,7 +146,7 @@ impl<'a> TransactionsProvider for SnapshotJarProvider<'a> {
fn transaction_by_id(&self, num: TxNumber) -> RethResult<Option<TransactionSigned>> {
TransactionSignedNoHash::decompress(
self.cursor()?
.row_by_number_with_cols::<0b1, 1>((num - self.user_header().tx_start()) as usize)?
.row_by_number_with_cols((num - self.user_header().tx_start()) as usize, 0b1)?
.ok_or_else(|| ProviderError::TransactionNotFound(num.into()))?[0],
)
.map(Into::into)
@ -178,7 +166,7 @@ impl<'a> TransactionsProvider for SnapshotJarProvider<'a> {
let mut cursor = self.cursor()?;
let tx = TransactionSignedNoHash::decompress(
cursor.row_by_key_with_cols::<0b1, 1>(&hash.0).unwrap().unwrap()[0],
cursor.row_by_key_with_cols(&hash.0, 0b1).unwrap().unwrap()[0],
)
.unwrap()
.with_hash();