feat(db): db error handling and Provider scaffolding (#154)

* feat(db): db error handling and Provider scaffolding
* remove one map_err
* fix after merge
This commit is contained in:
rakita
2022-11-03 17:00:58 +01:00
committed by GitHub
parent bff27a4154
commit 699db03187
21 changed files with 129 additions and 102 deletions

1
Cargo.lock generated
View File

@ -3194,7 +3194,6 @@ dependencies = [
"async-trait",
"auto_impl",
"bytes",
"eyre",
"futures",
"heapless",
"parity-scale-codec",

View File

@ -36,7 +36,7 @@ pub struct Cursor<'tx, K: TransactionKind, T: Table> {
#[macro_export]
macro_rules! decode {
($v:expr) => {
$v.map_err(|e| Error::Decode(e.into()))?.map(decoder::<T>).transpose()
$v.map_err(|e| Error::Read(e.into()))?.map(decoder::<T>).transpose()
};
}
@ -79,7 +79,7 @@ impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T>
let start = self
.inner
.set_range(start_key.encode().as_ref())
.map_err(|e| Error::Internal(e.into()))?
.map_err(|e| Error::Read(e.into()))?
.map(decoder::<T>);
Ok(Walker::<'cursor, 'tx, T, Self> { cursor: self, start, _tx_phantom: PhantomData {} })
@ -99,11 +99,7 @@ impl<'tx, K: TransactionKind, T: DupSort> DbDupCursorRO<'tx, T> for Cursor<'tx,
/// Returns the next `value` of a duplicate `key`.
fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
self.inner
.next_dup()
.map_err(|e| Error::Internal(e.into()))?
.map(decode_value::<T>)
.transpose()
self.inner.next_dup().map_err(|e| Error::Read(e.into()))?.map(decode_value::<T>).transpose()
}
/// Returns an iterator starting at a key greater or equal than `start_key` of a DUPSORT table.
@ -115,7 +111,7 @@ impl<'tx, K: TransactionKind, T: DupSort> DbDupCursorRO<'tx, T> for Cursor<'tx,
let start = self
.inner
.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
.map_err(|e| Error::Internal(e.into()))?
.map_err(|e| Error::Read(e.into()))?
.map(decode_one::<T>);
Ok(DupWalker::<'cursor, 'tx, T, Self> { cursor: self, start, _tx_phantom: PhantomData {} })
@ -129,28 +125,28 @@ impl<'tx, T: Table> DbCursorRW<'tx, T> for Cursor<'tx, RW, T> {
// Default `WriteFlags` is UPSERT
self.inner
.put(key.encode().as_ref(), value.compress().as_ref(), WriteFlags::UPSERT)
.map_err(|e| Error::Internal(e.into()))
.map_err(|e| Error::Write(e.into()))
}
fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner
.put(key.encode().as_ref(), value.compress().as_ref(), WriteFlags::APPEND)
.map_err(|e| Error::Internal(e.into()))
.map_err(|e| Error::Write(e.into()))
}
fn delete_current(&mut self) -> Result<(), Error> {
self.inner.del(WriteFlags::CURRENT).map_err(|e| Error::Internal(e.into()))
self.inner.del(WriteFlags::CURRENT).map_err(|e| Error::Delete(e.into()))
}
}
impl<'tx, T: DupSort> DbDupCursorRW<'tx, T> for Cursor<'tx, RW, T> {
fn delete_current_duplicates(&mut self) -> Result<(), Error> {
self.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| Error::Internal(e.into()))
self.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| Error::Delete(e.into()))
}
fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner
.put(key.encode().as_ref(), value.compress().as_ref(), WriteFlags::APPEND_DUP)
.map_err(|e| Error::Internal(e.into()))
.map_err(|e| Error::Write(e.into()))
}
}

View File

@ -39,11 +39,11 @@ impl<'a, E: EnvironmentKind> DatabaseGAT<'a> for Env<E> {
impl<E: EnvironmentKind> Database for Env<E> {
fn tx(&self) -> Result<<Self as DatabaseGAT<'_>>::TX, Error> {
Ok(Tx::new(self.inner.begin_ro_txn().map_err(|e| Error::Internal(e.into()))?))
Ok(Tx::new(self.inner.begin_ro_txn().map_err(|e| Error::InitTransaction(e.into()))?))
}
fn tx_mut(&self) -> Result<<Self as DatabaseGAT<'_>>::TXMut, Error> {
Ok(Tx::new(self.inner.begin_rw_txn().map_err(|e| Error::Internal(e.into()))?))
Ok(Tx::new(self.inner.begin_rw_txn().map_err(|e| Error::InitTransaction(e.into()))?))
}
}
@ -73,7 +73,7 @@ impl<E: EnvironmentKind> Env<E> {
..Default::default()
})
.open(path)
.map_err(|e| Error::Internal(e.into()))?,
.map_err(|e| Error::DatabaseLocation(e.into()))?,
};
Ok(env)
@ -81,7 +81,7 @@ impl<E: EnvironmentKind> Env<E> {
/// Creates all the defined tables, if necessary.
pub fn create_tables(&self) -> Result<(), Error> {
let tx = self.inner.begin_rw_txn().map_err(|e| Error::Initialization(e.into()))?;
let tx = self.inner.begin_rw_txn().map_err(|e| Error::InitTransaction(e.into()))?;
for (table_type, table) in TABLES {
let flags = match table_type {
@ -89,10 +89,10 @@ impl<E: EnvironmentKind> Env<E> {
TableType::DupSort => DatabaseFlags::DUP_SORT,
};
tx.create_db(Some(table), flags).map_err(|e| Error::Initialization(e.into()))?;
tx.create_db(Some(table), flags).map_err(|e| Error::TableCreation(e.into()))?;
}
tx.commit().map_err(|e| Error::Initialization(e.into()))?;
tx.commit().map_err(|e| Error::Commit(e.into()))?;
Ok(())
}

View File

@ -33,8 +33,10 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> {
Ok(Cursor {
inner: self
.inner
.cursor(&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?)
.map_err(|e| Error::Internal(e.into()))?,
.cursor(
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::InitCursor(e.into()))?,
)
.map_err(|e| Error::InitCursor(e.into()))?,
table: T::NAME,
_dbi: PhantomData,
})
@ -63,16 +65,16 @@ impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> {
}
fn commit(self) -> Result<bool, Error> {
self.inner.commit().map_err(|e| Error::Internal(e.into()))
self.inner.commit().map_err(|e| Error::Commit(e.into()))
}
fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, Error> {
self.inner
.get(
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?,
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Read(e.into()))?,
key.encode().as_ref(),
)
.map_err(|e| Error::Internal(e.into()))?
.map_err(|e| Error::Read(e.into()))?
.map(decode_one::<T>)
.transpose()
}
@ -82,12 +84,12 @@ impl<E: EnvironmentKind> DbTxMut<'_> for Tx<'_, RW, E> {
fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner
.put(
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?,
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Write(e.into()))?,
&key.encode(),
&value.compress(),
WriteFlags::UPSERT,
)
.map_err(|e| Error::Internal(e.into()))
.map_err(|e| Error::Write(e.into()))
}
fn delete<T: Table>(&self, key: T::Key, value: Option<T::Value>) -> Result<bool, Error> {
@ -100,17 +102,17 @@ impl<E: EnvironmentKind> DbTxMut<'_> for Tx<'_, RW, E> {
self.inner
.del(
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?,
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Delete(e.into()))?,
key.encode(),
data,
)
.map_err(|e| Error::Internal(e.into()))
.map_err(|e| Error::Delete(e.into()))
}
fn clear<T: Table>(&self) -> Result<(), Error> {
self.inner
.clear_db(&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?)
.map_err(|e| Error::Internal(e.into()))?;
.clear_db(&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Delete(e.into()))?)
.map_err(|e| Error::Delete(e.into()))?;
Ok(())
}

View File

@ -12,7 +12,6 @@ reth-primitives = { path = "../primitives" }
reth-rpc-types = { path = "../net/rpc-types" }
async-trait = "0.1.57"
thiserror = "1.0.37"
eyre = "0.6.8"
auto_impl = "1.0"
tokio = { version = "1.21.2", features = ["sync"] }
bytes = "1.2"

View File

@ -25,7 +25,7 @@ where
T: ScaleValue + parity_scale_codec::Decode + Sync + Send + std::fmt::Debug,
{
fn decompress<B: Into<bytes::Bytes>>(value: B) -> Result<T, Error> {
decode_from_bytes(value.into()).map_err(|e| Error::Decode(e.into()))
decode_from_bytes(value.into()).map_err(|_| Error::DecodeError)
}
}

View File

@ -1,16 +1,31 @@
/// Database Error
#[derive(Debug, thiserror::Error)]
/// KV error type. They are using u32 to represent error code.
#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
pub enum Error {
/// Encode errors.
#[error("A table encoding error:{0}")]
Encode(eyre::Error),
/// Decode errors.
#[error("A table decoding error:{0}")]
Decode(eyre::Error),
/// Initialization database error.
#[error("Initialization database error:{0}")]
Initialization(eyre::Error),
/// Internal DB error.
#[error("A internal database error:{0}")]
Internal(eyre::Error),
/// Failed to open database.
#[error("{0:?}")]
DatabaseLocation(u32),
/// Failed to create a table in database.
#[error("Table Creating error code: {0:?}")]
TableCreation(u32),
/// Failed to insert a value into a table.
#[error("Database write error code: {0:?}")]
Write(u32),
/// Failed to get a value into a table.
#[error("Database read error code: {0:?}")]
Read(u32),
/// Failed to delete a `(key, vakue)` pair into a table.
#[error("Database delete error code: {0:?}")]
Delete(u32),
/// Failed to commit transaction changes into the database.
#[error("Database commit error code: {0:?}")]
Commit(u32),
/// Failed to initiate a transaction.
#[error("Initialization of transaction errored with code: {0:?}")]
InitTransaction(u32),
/// Failed to initiate a cursor.
#[error("Initialization of cursor errored with code: {0:?}")]
InitCursor(u32),
/// Failed to decode a key from a table..
#[error("Error decoding value.")]
DecodeError,
}

View File

@ -8,11 +8,10 @@ pub mod tables;
use std::marker::PhantomData;
pub use container::DBContainer;
pub use error::Error;
pub use table::*;
pub use container::DBContainer;
// Sealed trait helper to prevent misuse of the API.
mod sealed {
pub trait Sealed: Sized {}

View File

@ -8,7 +8,6 @@ use crate::{
impl_fixed_arbitrary,
};
use bytes::Bytes;
use eyre::eyre;
use reth_codecs::main_codec;
use reth_primitives::{Account, Address, TxNumber};
use serde::{Deserialize, Serialize};
@ -61,11 +60,8 @@ impl Decode for TxNumberAddress {
fn decode<B: Into<Bytes>>(value: B) -> Result<Self, Error> {
let value: bytes::Bytes = value.into();
let num = u64::from_be_bytes(
value.as_ref()[..8]
.try_into()
.map_err(|_| Error::Decode(eyre!("Into bytes error.")))?,
);
let num =
u64::from_be_bytes(value.as_ref()[..8].try_into().map_err(|_| Error::DecodeError)?);
let hash = Address::from_slice(&value.slice(8..));
Ok(TxNumberAddress((num, hash)))

View File

@ -8,7 +8,6 @@ use crate::{
impl_fixed_arbitrary,
};
use bytes::Bytes;
use eyre::eyre;
use reth_primitives::{BlockHash, BlockNumber, H256};
use serde::{Deserialize, Serialize};
@ -60,11 +59,8 @@ impl Decode for BlockNumHash {
fn decode<B: Into<Bytes>>(value: B) -> Result<Self, Error> {
let value: bytes::Bytes = value.into();
let num = u64::from_be_bytes(
value.as_ref()[..8]
.try_into()
.map_err(|_| Error::Decode(eyre!("Into bytes error.")))?,
);
let num =
u64::from_be_bytes(value.as_ref()[..8].try_into().map_err(|_| Error::DecodeError)?);
let hash = H256::from_slice(&value.slice(8..));
Ok(BlockNumHash((num, hash)))

View File

@ -17,6 +17,6 @@ impl Compress for IntegerList {
impl Decompress for IntegerList {
fn decompress<B: Into<Bytes>>(value: B) -> Result<Self, Error> {
IntegerList::from_bytes(&value.into()).map_err(|e| Error::Decode(eyre::eyre!("{e}")))
IntegerList::from_bytes(&value.into()).map_err(|_| Error::DecodeError)
}
}

View File

@ -14,7 +14,6 @@ use crate::db::{
table::{Decode, Encode},
Error,
};
use eyre::eyre;
/// Macro that implements [`Encode`] and [`Decode`] for uint types.
macro_rules! impl_uints {
@ -35,7 +34,7 @@ macro_rules! impl_uints {
let value: bytes::Bytes = value.into();
Ok(
$name::from_be_bytes(
value.as_ref().try_into().map_err(|_| Error::Decode(eyre!("Into bytes error.")))?
value.as_ref().try_into().map_err(|_| Error::DecodeError)?
)
)
}

View File

@ -4,7 +4,6 @@ use crate::db::{
table::{Decode, Encode},
Error,
};
use eyre::eyre;
use reth_primitives::TxNumber;
/// Sometimes data can be too big to be saved for a single key. This helps out by dividing the data
@ -51,9 +50,7 @@ where
let tx_num_index = value.len() - 8;
let highest_tx_number = u64::from_be_bytes(
value.as_ref()[tx_num_index..]
.try_into()
.map_err(|_| Error::Decode(eyre!("Into bytes error.")))?,
value.as_ref()[tx_num_index..].try_into().map_err(|_| Error::DecodeError)?,
);
let key = T::decode(value.slice(..tx_num_index))?;

View File

@ -10,4 +10,7 @@ pub enum Error {
#[error(transparent)]
Consensus(#[from] crate::consensus::Error),
#[error(transparent)]
Database(#[from] crate::db::Error),
}

View File

@ -7,7 +7,7 @@ use reth_primitives::{
/// Client trait for fetching `Header` related data.
#[auto_impl(&)]
pub trait HeaderProvider: Send + Sync + 'static {
pub trait HeaderProvider: Send + Sync {
/// Check if block is known
fn is_known(&self, block_hash: &BlockHash) -> Result<bool> {
self.header(block_hash).map(|header| header.is_some())
@ -18,7 +18,7 @@ pub trait HeaderProvider: Send + Sync + 'static {
}
/// Client trait for fetching `Block` related data.
pub trait BlockProvider: Send + Sync + 'static {
pub trait BlockProvider: Send + Sync {
/// Returns the current info for the chain.
fn chain_info(&self) -> Result<ChainInfo>;

View File

@ -0,0 +1,30 @@
use crate::{
db::{tables, Database, DbTx},
provider::HeaderProvider,
};
/// Provider
pub struct DbProvider<DB: Database> {
/// Database
db: DB,
}
impl<DB: Database> DbProvider<DB> {
/// create new database provider
pub fn new(db: DB) -> Self {
Self { db }
}
}
impl<DB: Database> HeaderProvider for DbProvider<DB> {
fn header(
&self,
block_hash: &reth_primitives::BlockHash,
) -> crate::Result<Option<reth_primitives::Header>> {
self.db.view(|tx| tx.get::<tables::Headers>((0, *block_hash).into()))?.map_err(Into::into)
}
fn is_known(&self, block_hash: &reth_primitives::BlockHash) -> crate::Result<bool> {
self.header(block_hash).map(|header| header.is_some())
}
}

View File

@ -1,5 +1,7 @@
mod block;
mod db_provider;
mod storage;
pub use block::{BlockProvider, HeaderProvider};
pub use db_provider::DbProvider;
pub use storage::StorageProvider;

View File

@ -2,7 +2,7 @@ use crate::Result;
use reth_primitives::{rpc::BlockId, Address, H256, U256};
/// Provides access to storage data
pub trait StorageProvider: Send + Sync + 'static {
pub trait StorageProvider: Send + Sync {
/// Returns the value from a storage position at a given address and `BlockId`
fn storage_at(&self, address: Address, index: U256, at: BlockId) -> Result<Option<H256>>;
}

View File

@ -1,7 +1,6 @@
use crate::{error::mdbx_result, Error, TransactionKind};
use derive_more::*;
use std::{borrow::Cow, slice};
use thiserror::Error;
/// Implement this to be able to decode data values
pub trait TableObject<'tx> {
@ -96,13 +95,8 @@ impl<'tx, const LEN: usize> TableObject<'tx> for [u8; LEN] {
where
Self: Sized,
{
#[derive(Clone, Debug, Display, Error)]
struct InvalidSize<const LEN: usize> {
got: usize,
}
if data_val.len() != LEN {
return Err(Error::DecodeError(Box::new(InvalidSize::<LEN> { got: data_val.len() })))
return Err(Error::DecodeErrorLenDiff)
}
let mut a = [0; LEN];
a[..].copy_from_slice(data_val);

View File

@ -2,7 +2,7 @@ use libc::c_int;
use std::{ffi::CStr, fmt, result, str};
/// An MDBX error kind.
#[derive(Debug)]
#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
pub enum Error {
KeyExist,
NotFound,
@ -29,11 +29,11 @@ pub enum Error {
Multival,
WannaRecovery,
KeyMismatch,
InvalidValue,
DecodeError,
Access,
TooLarge,
DecodeError(Box<dyn std::error::Error + Send + Sync + 'static>),
Other(c_int),
DecodeErrorLenDiff,
Other(u32),
}
impl Error {
@ -65,16 +65,16 @@ impl Error {
ffi::MDBX_EMULTIVAL => Error::Multival,
ffi::MDBX_WANNA_RECOVERY => Error::WannaRecovery,
ffi::MDBX_EKEYMISMATCH => Error::KeyMismatch,
ffi::MDBX_EINVAL => Error::InvalidValue,
ffi::MDBX_EINVAL => Error::DecodeError,
ffi::MDBX_EACCESS => Error::Access,
ffi::MDBX_TOO_LARGE => Error::TooLarge,
other => Error::Other(other),
other => Error::Other(other as u32),
}
}
/// Converts an [Error] to the raw error code.
fn to_err_code(&self) -> c_int {
match self {
pub fn to_err_code(&self) -> u32 {
let err_code = match self {
Error::KeyExist => ffi::MDBX_KEYEXIST,
Error::NotFound => ffi::MDBX_NOTFOUND,
Error::PageNotFound => ffi::MDBX_PAGE_NOTFOUND,
@ -99,31 +99,31 @@ impl Error {
Error::Multival => ffi::MDBX_EMULTIVAL,
Error::WannaRecovery => ffi::MDBX_WANNA_RECOVERY,
Error::KeyMismatch => ffi::MDBX_EKEYMISMATCH,
Error::InvalidValue => ffi::MDBX_EINVAL,
Error::DecodeError => ffi::MDBX_EINVAL,
Error::Access => ffi::MDBX_EACCESS,
Error::TooLarge => ffi::MDBX_TOO_LARGE,
Error::Other(err_code) => *err_code,
Error::Other(err_code) => *err_code as i32,
_ => unreachable!(),
}
};
err_code as u32
}
}
impl From<Error> for u32 {
fn from(value: Error) -> Self {
value.to_err_code()
}
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::DecodeError(reason) => write!(fmt, "{reason}"),
other => {
write!(fmt, "{}", unsafe {
let err = ffi::mdbx_strerror(other.to_err_code());
str::from_utf8_unchecked(CStr::from_ptr(err).to_bytes())
})
}
}
write!(fmt, "{}", unsafe {
let err = ffi::mdbx_strerror(self.to_err_code() as i32);
str::from_utf8_unchecked(CStr::from_ptr(err).to_bytes())
})
}
}
impl std::error::Error for Error {}
/// An MDBX result.
pub type Result<T> = result::Result<T, Error>;

View File

@ -19,8 +19,8 @@ use serde_json::Value;
#[async_trait::async_trait]
impl<Pool, Client> EthApiServer for EthApi<Pool, Client>
where
Pool: TransactionPool<Transaction = Transaction> + Clone,
Client: BlockProvider + StorageProvider,
Pool: TransactionPool<Transaction = Transaction> + Clone + 'static,
Client: BlockProvider + StorageProvider + 'static,
{
fn protocol_version(&self) -> Result<U64> {
Ok(self.protocol_version())