fix(db): add traits Encode/Decode for keys and Compress/Uncompress for values (#151)

* add UncompressedUint

* add more UncompressedUint keys

* add docs

* split key/value to encode/decode and compress/uncompress traits

* reveert into

* clippy

* rm whitespaces

* remove TODO

* Remove scale encode/decode traits

* decompress

* clippy

Co-authored-by: rakita <dragan0rakita@gmail.com>
This commit is contained in:
joshieDo
2022-11-03 15:02:19 +08:00
committed by GitHub
parent 3c72a12aff
commit becceb2802
18 changed files with 195 additions and 66 deletions

View File

@ -304,7 +304,7 @@ mod tests {
parent.gas_used = 17763076;
parent.gas_limit = 30000000;
parent.base_fee_per_gas = Some(0x28041f7f5);
parent.number = parent.number - 1;
parent.number -= 1;
let ommers = Vec::new();
let receipts = Vec::new();

View File

@ -4,8 +4,8 @@ use std::marker::PhantomData;
use crate::utils::*;
use reth_interfaces::db::{
DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupSort, DupWalker, Encode, Error, Table,
Walker,
Compress, DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupSort, DupWalker, Encode,
Error, Table, Walker,
};
use reth_libmdbx::{self, TransactionKind, WriteFlags, RO, RW};
@ -128,13 +128,13 @@ impl<'tx, T: Table> DbCursorRW<'tx, T> for Cursor<'tx, RW, T> {
fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> {
// Default `WriteFlags` is UPSERT
self.inner
.put(key.encode().as_ref(), value.encode().as_ref(), WriteFlags::UPSERT)
.put(key.encode().as_ref(), value.compress().as_ref(), WriteFlags::UPSERT)
.map_err(|e| Error::Internal(e.into()))
}
fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner
.put(key.encode().as_ref(), value.encode().as_ref(), WriteFlags::APPEND)
.put(key.encode().as_ref(), value.compress().as_ref(), WriteFlags::APPEND)
.map_err(|e| Error::Internal(e.into()))
}
@ -150,7 +150,7 @@ impl<'tx, T: DupSort> DbDupCursorRW<'tx, T> for Cursor<'tx, RW, T> {
fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner
.put(key.encode().as_ref(), value.encode().as_ref(), WriteFlags::APPEND_DUP)
.put(key.encode().as_ref(), value.compress().as_ref(), WriteFlags::APPEND_DUP)
.map_err(|e| Error::Internal(e.into()))
}
}

View File

@ -269,7 +269,7 @@ mod tests {
{
let tx = env.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_dup::<PlainStorageState>().unwrap();
let mut walker = cursor.walk_dup(key.into(), H256::from_low_u64_be(1)).unwrap();
let mut walker = cursor.walk_dup(key, H256::from_low_u64_be(1)).unwrap();
assert_eq!(
value11,
walker

View File

@ -1,7 +1,9 @@
//! Transaction wrapper for libmdbx-sys.
use crate::{kv::cursor::Cursor, utils::decode_one};
use reth_interfaces::db::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT, DupSort, Encode, Error, Table};
use reth_interfaces::db::{
Compress, DbTx, DbTxGAT, DbTxMut, DbTxMutGAT, DupSort, Encode, Error, Table,
};
use reth_libmdbx::{EnvironmentKind, Transaction, TransactionKind, WriteFlags, RW};
use std::marker::PhantomData;
@ -82,7 +84,7 @@ impl<E: EnvironmentKind> DbTxMut<'_> for Tx<'_, RW, E> {
.put(
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?,
&key.encode(),
&value.encode(),
&value.compress(),
WriteFlags::UPSERT,
)
.map_err(|e| Error::Internal(e.into()))
@ -91,7 +93,7 @@ impl<E: EnvironmentKind> DbTxMut<'_> for Tx<'_, RW, E> {
fn delete<T: Table>(&self, key: T::Key, value: Option<T::Value>) -> Result<bool, Error> {
let mut data = None;
let value = value.map(Encode::encode);
let value = value.map(Compress::compress);
if let Some(value) = &value {
data = Some(value.as_ref());
};

View File

@ -2,7 +2,7 @@
//suse crate::kv::Error;
use bytes::Bytes;
use reth_interfaces::db::{Decode, Error, Table};
use reth_interfaces::db::{Decode, Decompress, Error, Table};
use std::borrow::Cow;
/// Returns the default page size that can be used in this OS.
@ -26,10 +26,11 @@ pub(crate) fn decoder<'a, T>(
where
T: Table,
T::Key: Decode,
T::Value: Decompress,
{
Ok((
Decode::decode(Bytes::from(kv.0.into_owned()))?,
Decode::decode(Bytes::from(kv.1.into_owned()))?,
Decompress::decompress(Bytes::from(kv.1.into_owned()))?,
))
}
@ -38,7 +39,7 @@ pub(crate) fn decode_value<'a, T>(kv: (Cow<'a, [u8]>, Cow<'a, [u8]>)) -> Result<
where
T: Table,
{
Decode::decode(Bytes::from(kv.1.into_owned()))
Decompress::decompress(Bytes::from(kv.1.into_owned()))
}
/// Helper function to decode a value. It can be a key or subkey.
@ -46,5 +47,5 @@ pub(crate) fn decode_one<T>(value: Cow<'_, [u8]>) -> Result<T::Value, Error>
where
T: Table,
{
Decode::decode(Bytes::from(value.into_owned()))
Decompress::decompress(Bytes::from(value.into_owned()))
}

View File

@ -8,7 +8,7 @@ mod inputs;
/// Some types like [`IntegerList`] might have some restrictons on how they're fuzzed. For example,
/// the list is assumed to be sorted before creating the object.
macro_rules! impl_fuzzer_with_input {
($(($name:tt, $input_type:tt)),+) => {
($(($name:tt, $input_type:tt, $encode:tt, $encode_method:tt, $decode:tt, $decode_method:tt)),+) => {
$(
/// Macro generated module to be used by test-fuzz and `bench` if it applies.
#[allow(non_snake_case)]
@ -29,11 +29,11 @@ macro_rules! impl_fuzzer_with_input {
/// This method is used for benchmarking, so its parameter should be the actual type that is being tested.
pub fn encode_and_decode(obj: $name) -> (usize, $name)
{
let data = table::Encode::encode(obj);
let data = table::$encode::$encode_method(obj);
let size = data.len();
// Some `data` might be a fixed array.
(size, table::Decode::decode(data.to_vec()).expect("failed to decode"))
(size, table::$decode::$decode_method(data.to_vec()).expect("failed to decode"))
}
#[cfg(test)]
@ -56,14 +56,36 @@ macro_rules! impl_fuzzer_with_input {
/// Fuzzer generates a random instance of the object and proceeds to encode and decode it. It then
/// makes sure that it matches the original object.
macro_rules! impl_fuzzer {
macro_rules! impl_fuzzer_key {
($($name:tt),+) => {
$(
impl_fuzzer_with_input!(($name, $name));
impl_fuzzer_with_input!(($name, $name, Encode, encode, Decode, decode));
)+
};
}
impl_fuzzer!(Header, Account, BlockNumHash, TxNumberAddress);
/// Fuzzer generates a random instance of the object and proceeds to compress and decompress it. It
/// then makes sure that it matches the original object.
macro_rules! impl_fuzzer_value {
($($name:tt),+) => {
$(
impl_fuzzer_with_input!(($name, $name, Compress, compress, Decompress, decompress));
)+
};
}
impl_fuzzer_with_input!((IntegerList, IntegerListInput));
/// Fuzzer generates a random instance of the object and proceeds to compress and decompress it. It
/// then makes sure that it matches the original object. It supports being fed a different kind of
/// input, as long as it supports Into<T>.
macro_rules! impl_fuzzer_value_with_input {
($(($name:tt, $input:tt)),+) => {
$(
impl_fuzzer_with_input!(($name, $input, Compress, compress, Decompress, decompress));
)+
};
}
impl_fuzzer_value!(Header, Account);
impl_fuzzer_key!(BlockNumHash, TxNumberAddress);
impl_fuzzer_value_with_input!((IntegerList, IntegerListInput));

View File

@ -14,7 +14,6 @@ use reth_primitives::*;
// }
//
// impl_heapless_postcard!(T, MaxSize(T))
macro_rules! impl_postcard {
($($name:tt),+) => {
$(

View File

@ -1,47 +1,59 @@
use crate::db::{models::accounts::AccountBeforeTx, Decode, Encode, Error};
use crate::db::{models::accounts::AccountBeforeTx, Compress, Decompress, Error};
use parity_scale_codec::decode_from_bytes;
use reth_primitives::*;
mod sealed {
pub trait Sealed {}
}
/// Marker trait type to restrict the TableEncode and TableDecode with scale to chosen types.
pub trait ScaleOnly: sealed::Sealed {}
impl<T> Encode for T
/// Marker trait type to restrict the [`Compress`] and [`Decompress`] with scale to chosen types.
pub trait ScaleValue: sealed::Sealed {}
impl<T> Compress for T
where
T: ScaleOnly + parity_scale_codec::Encode + Sync + Send + std::fmt::Debug,
T: ScaleValue + parity_scale_codec::Encode + Sync + Send + std::fmt::Debug,
{
type Encoded = Vec<u8>;
type Compressed = Vec<u8>;
fn encode(self) -> Self::Encoded {
fn compress(self) -> Self::Compressed {
parity_scale_codec::Encode::encode(&self)
}
}
impl<T> Decode for T
impl<T> Decompress for T
where
T: ScaleOnly + parity_scale_codec::Decode + Sync + Send + std::fmt::Debug,
T: ScaleValue + parity_scale_codec::Decode + Sync + Send + std::fmt::Debug,
{
fn decode<B: Into<bytes::Bytes>>(value: B) -> Result<T, Error> {
fn decompress<B: Into<bytes::Bytes>>(value: B) -> Result<T, Error> {
decode_from_bytes(value.into()).map_err(|e| Error::Decode(e.into()))
}
}
/// Implements SCALE both for value and key types.
macro_rules! impl_scale {
($($name:tt),+) => {
$(
impl ScaleOnly for $name {}
impl ScaleValue for $name {}
impl sealed::Sealed for $name {}
)+
};
}
impl ScaleOnly for Vec<u8> {}
/// Implements SCALE only for value types.
macro_rules! impl_scale_value {
($($name:tt),+) => {
$(
impl ScaleValue for $name {}
impl sealed::Sealed for $name {}
)+
};
}
impl ScaleValue for Vec<u8> {}
impl sealed::Sealed for Vec<u8> {}
impl_scale!(u8, u32, u16, u64, U256, H256, H160);
impl_scale!(U256, H256, H160);
impl_scale!(Header, Account, Log, Receipt, TxType, StorageEntry);
impl_scale!(AccountBeforeTx);
impl_scale_value!(u8, u32, u16, u64);

View File

@ -66,7 +66,7 @@ impl Decode for TxNumberAddress {
.try_into()
.map_err(|_| Error::Decode(eyre!("Into bytes error.")))?,
);
let hash = Address::decode(value.slice(8..))?;
let hash = Address::from_slice(&value.slice(8..));
Ok(TxNumberAddress((num, hash)))
}

View File

@ -65,7 +65,7 @@ impl Decode for BlockNumHash {
.try_into()
.map_err(|_| Error::Decode(eyre!("Into bytes error.")))?,
);
let hash = H256::decode(value.slice(8..))?;
let hash = H256::from_slice(&value.slice(8..));
Ok(BlockNumHash((num, hash)))
}
@ -88,7 +88,7 @@ mod test {
bytes[..8].copy_from_slice(&num.to_be_bytes());
bytes[8..].copy_from_slice(&hash.0);
let encoded = Encode::encode(key.clone());
let encoded = Encode::encode(key);
assert_eq!(encoded, bytes);
let decoded: BlockNumHash = Decode::decode(encoded.to_vec()).unwrap();

View File

@ -1,22 +1,22 @@
//! Implements [`Encode`] and [`Decode`] for [`IntegerList`]
//! Implements [`Compress`] and [`Decompress`] for [`IntegerList`]
use crate::db::{
error::Error,
table::{Decode, Encode},
table::{Compress, Decompress},
};
use bytes::Bytes;
use reth_primitives::IntegerList;
impl Encode for IntegerList {
type Encoded = Vec<u8>;
impl Compress for IntegerList {
type Compressed = Vec<u8>;
fn encode(self) -> Self::Encoded {
fn compress(self) -> Self::Compressed {
self.to_bytes()
}
}
impl Decode for IntegerList {
fn decode<B: Into<Bytes>>(value: B) -> Result<Self, Error> {
impl Decompress for IntegerList {
fn decompress<B: Into<Bytes>>(value: B) -> Result<Self, Error> {
IntegerList::from_bytes(&value.into()).map_err(|e| Error::Decode(eyre::eyre!("{e}")))
}
}

View File

@ -7,4 +7,79 @@ pub mod sharded_key;
pub use accounts::*;
pub use blocks::*;
use reth_primitives::{Address, H256};
pub use sharded_key::ShardedKey;
use crate::db::{
table::{Decode, Encode},
Error,
};
use eyre::eyre;
/// Macro that implements [`Encode`] and [`Decode`] for uint types.
macro_rules! impl_uints {
($(($name:tt, $size:tt)),+) => {
$(
impl Encode for $name
{
type Encoded = [u8; $size];
fn encode(self) -> Self::Encoded {
self.to_be_bytes()
}
}
impl Decode for $name
{
fn decode<B: Into<bytes::Bytes>>(value: B) -> Result<Self, Error> {
let value: bytes::Bytes = value.into();
Ok(
$name::from_be_bytes(
value.as_ref().try_into().map_err(|_| Error::Decode(eyre!("Into bytes error.")))?
)
)
}
}
)+
};
}
impl_uints!((u64, 8), (u32, 4), (u16, 2), (u8, 1));
impl Encode for Vec<u8> {
type Encoded = Vec<u8>;
fn encode(self) -> Self::Encoded {
self
}
}
impl Decode for Vec<u8> {
fn decode<B: Into<bytes::Bytes>>(value: B) -> Result<Self, Error> {
Ok(value.into().to_vec())
}
}
impl Encode for Address {
type Encoded = [u8; 20];
fn encode(self) -> Self::Encoded {
self.to_fixed_bytes()
}
}
impl Decode for Address {
fn decode<B: Into<bytes::Bytes>>(value: B) -> Result<Self, Error> {
Ok(Address::from_slice(&value.into()[..]))
}
}
impl Encode for H256 {
type Encoded = [u8; 32];
fn encode(self) -> Self::Encoded {
self.to_fixed_bytes()
}
}
impl Decode for H256 {
fn decode<B: Into<bytes::Bytes>>(value: B) -> Result<Self, Error> {
Ok(H256::from_slice(&value.into()[..]))
}
}

View File

@ -5,12 +5,27 @@ use std::{
marker::{Send, Sync},
};
/// Trait that will transform the data to be saved in the DB in a (ideally) compressed format
pub trait Compress: Send + Sync + Sized + Debug {
/// Compressed type.
type Compressed: AsRef<[u8]> + Send + Sync;
/// Compresses data going into the database.
fn compress(self) -> Self::Compressed;
}
/// Trait that will transform the data to be read from the DB.
pub trait Decompress: Send + Sync + Sized + Debug {
/// Decompresses data coming from the database.
fn decompress<B: Into<Bytes>>(value: B) -> Result<Self, Error>;
}
/// Trait that will transform the data to be saved in the DB.
pub trait Encode: Send + Sync + Sized + Debug {
/// Encoded type.
type Encoded: AsRef<[u8]> + Send + Sync;
/// Decodes data going into the database.
/// Encodes data going into the database.
fn encode(self) -> Self::Encoded;
}
@ -20,10 +35,15 @@ pub trait Decode: Send + Sync + Sized + Debug {
fn decode<B: Into<Bytes>>(value: B) -> Result<Self, Error>;
}
/// Generic trait that enforces the database value to implement [`Encode`] and [`Decode`].
pub trait Object: Encode + Decode {}
/// Generic trait that enforces the database key to implement [`Encode`] and [`Decode`].
pub trait Key: Encode + Decode {}
impl<T> Object for T where T: Encode + Decode {}
impl<T> Key for T where T: Encode + Decode {}
/// Generic trait that enforces the database value to implement [`Compress`] and [`Decompress`].
pub trait Value: Compress + Decompress {}
impl<T> Value for T where T: Compress + Decompress {}
/// Generic trait that a database table should follow.
///
@ -39,11 +59,11 @@ pub trait Table: Send + Sync + Debug + 'static {
/// Key element of `Table`.
///
/// Sorting should be taken into account when encoding this.
type Key: Object;
type Key: Key;
/// Value element of `Table`.
type Value: Object;
type Value: Value;
/// Seek Key element of `Table`.
type SeekKey: Object;
type SeekKey: Key;
}
/// DupSort allows for keys not to be repeated in the database,
@ -52,5 +72,5 @@ pub trait DupSort: Table {
/// Subkey type. For more check https://libmdbx.dqdkfa.ru/usage.html#autotoc_md48
///
/// Sorting should be taken into account when encoding this.
type SubKey: Object;
type SubKey: Key;
}

View File

@ -17,7 +17,7 @@ pub fn setup_bench_db(num_rows: u32) -> (TempDir, Environment<NoWriteMap>) {
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
for i in 0..num_rows {
txn.put(&db, &get_key(i), &get_data(i), WriteFlags::empty()).unwrap();
txn.put(&db, get_key(i), get_data(i), WriteFlags::empty()).unwrap();
}
txn.commit().unwrap();
}

View File

@ -52,7 +52,7 @@ mod test_utils {
LittleEndian::write_u64(&mut value, height);
let tx = env.begin_rw_txn().expect("begin_rw_txn");
let index = tx.create_db(None, DatabaseFlags::DUP_SORT).expect("open index db");
tx.put(&index, &HEIGHT_KEY, &value, WriteFlags::empty()).expect("tx.put");
tx.put(&index, HEIGHT_KEY, value, WriteFlags::empty()).expect("tx.put");
tx.commit().expect("tx.commit");
}
}

View File

@ -101,7 +101,7 @@ fn test_stat() {
let mut value = [0u8; 8];
LittleEndian::write_u64(&mut value, i);
let tx = env.begin_rw_txn().expect("begin_rw_txn");
tx.put(&tx.open_db(None).unwrap(), &value, &value, WriteFlags::default()).expect("tx.put");
tx.put(&tx.open_db(None).unwrap(), value, value, WriteFlags::default()).expect("tx.put");
tx.commit().expect("tx.commit");
}
@ -143,7 +143,7 @@ fn test_freelist() {
let mut value = [0u8; 8];
LittleEndian::write_u64(&mut value, i);
let tx = env.begin_rw_txn().expect("begin_rw_txn");
tx.put(&tx.open_db(None).unwrap(), &value, &value, WriteFlags::default()).expect("tx.put");
tx.put(&tx.open_db(None).unwrap(), value, value, WriteFlags::default()).expect("tx.put");
tx.commit().expect("tx.commit");
}
let tx = env.begin_rw_txn().expect("begin_rw_txn");

View File

@ -261,7 +261,7 @@ fn test_concurrent_writers() {
threads.push(thread::spawn(move || {
let txn = writer_env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
txn.put(&db, &format!("{}{}", key, i), &format!("{}{}", val, i), WriteFlags::empty())
txn.put(&db, format!("{}{}", key, i), format!("{}{}", val, i), WriteFlags::empty())
.unwrap();
txn.commit().is_ok()
}));

View File

@ -297,8 +297,7 @@ pub mod tests {
// Validate that all necessary tables are updated after the
// header download with some previous progress.
async fn headers_stage_prev_progress() {
// TODO: set bigger range once `MDBX_EKEYMISMATCH` issue is resolved
let (start, end) = (10000, 10240);
let (start, end) = (10000, 10241);
let head = gen_random_header(start, None);
let headers = gen_random_header_range(start + 1..end, head.hash());
let db = HeadersDB::default();
@ -374,7 +373,7 @@ pub mod tests {
let db = HeadersDB::default();
db.insert_header(&head).expect("failed to insert header");
for header in first_range.iter().chain(second_range.iter()) {
db.insert_header(&header).expect("failed to insert header");
db.insert_header(header).expect("failed to insert header");
}
let input = UnwindInput { bad_block: None, stage_progress: 15, unwind_to: 15 };
@ -408,8 +407,7 @@ pub mod tests {
let consensus = Arc::new(TestConsensus::default());
let downloader = test_utils::TestDownloader::new(download_result);
let mut stage =
HeaderStage { consensus: consensus.clone(), client: client.clone(), downloader };
let mut stage = HeaderStage { consensus: consensus.clone(), client, downloader };
tokio::spawn(async move {
let mut db = DBContainer::<Env<WriteMap>>::new(db.borrow()).unwrap();
let result = stage.execute(&mut db, input).await;
@ -525,7 +523,7 @@ pub mod tests {
let mut cursor_td = tx.cursor_mut::<tables::HeaderTD>()?;
let td =
U256::from_big_endian(&cursor_td.last()?.map(|(_, v)| v).unwrap_or(vec![]));
U256::from_big_endian(&cursor_td.last()?.map(|(_, v)| v).unwrap_or_default());
cursor_td
.append(key, H256::from_uint(&(td + header.difficulty)).as_bytes().to_vec())?;