perf(db): reduce different repeated allocations (#2103)

This commit is contained in:
joshieDo
2023-04-13 23:33:53 +08:00
committed by GitHub
parent 2f003bf8ae
commit 019ddd1edc
32 changed files with 358 additions and 181 deletions

View File

@ -32,4 +32,5 @@ enum StageEnum {
Execution, Execution,
Hashing, Hashing,
Merkle, Merkle,
TxLookup,
} }

View File

@ -16,7 +16,7 @@ use reth_staged_sync::{
Config, Config,
}; };
use reth_stages::{ use reth_stages::{
stages::{BodyStage, ExecutionStage, SenderRecoveryStage}, stages::{BodyStage, ExecutionStage, SenderRecoveryStage, TransactionLookupStage},
ExecInput, Stage, StageId, UnwindInput, ExecInput, Stage, StageId, UnwindInput,
}; };
use std::{net::SocketAddr, sync::Arc}; use std::{net::SocketAddr, sync::Arc};
@ -181,6 +181,16 @@ impl Command {
} }
stage.execute(&mut tx, input).await?; stage.execute(&mut tx, input).await?;
} }
StageEnum::TxLookup => {
let mut stage = TransactionLookupStage::new(num_blocks);
// Unwind first
if !self.skip_unwind {
stage.unwind(&mut tx, unwind).await?;
}
stage.execute(&mut tx, input).await?;
}
_ => {} _ => {}
} }

View File

@ -1,5 +1,5 @@
use crate::{H256, KECCAK_EMPTY, U256}; use crate::{H256, KECCAK_EMPTY, U256};
use bytes::{Buf, BufMut, Bytes}; use bytes::{Buf, Bytes};
use fixed_hash::byteorder::{BigEndian, ReadBytesExt}; use fixed_hash::byteorder::{BigEndian, ReadBytesExt};
use reth_codecs::{main_codec, Compact}; use reth_codecs::{main_codec, Compact};
use revm_primitives::{Bytecode as RevmBytecode, BytecodeState, JumpMap}; use revm_primitives::{Bytecode as RevmBytecode, BytecodeState, JumpMap};
@ -84,7 +84,10 @@ impl Deref for Bytecode {
} }
impl Compact for Bytecode { impl Compact for Bytecode {
fn to_compact(self, buf: &mut impl BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
buf.put_u32(self.0.bytecode.len() as u32); buf.put_u32(self.0.bytecode.len() as u32);
buf.put_slice(self.0.bytecode.as_ref()); buf.put_slice(self.0.bytecode.as_ref());
let len = match self.0.state() { let len = match self.0.state() {

View File

@ -208,7 +208,10 @@ where
} }
impl Compact for Bytes { impl Compact for Bytes {
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
let len = self.len(); let len = self.len();
buf.put(self.0); buf.put(self.0);
len len

View File

@ -35,6 +35,14 @@ impl IntegerList {
vec vec
} }
/// Serializes a [`IntegerList`] into a sequence of bytes.
pub fn to_mut_bytes<B: bytes::BufMut>(&self, buf: &mut B) {
let len = self.0.size_in_bytes();
let mut vec = Vec::with_capacity(len);
self.0.serialize_into(&mut vec).unwrap();
buf.put_slice(vec.as_slice());
}
/// Deserializes a sequence of bytes into a proper [`IntegerList`]. /// Deserializes a sequence of bytes into a proper [`IntegerList`].
pub fn from_bytes(data: &[u8]) -> Result<Self, Error> { pub fn from_bytes(data: &[u8]) -> Result<Self, Error> {
Ok(Self(EliasFano::deserialize_from(data).map_err(|_| Error::FailedDeserialize)?)) Ok(Self(EliasFano::deserialize_from(data).map_err(|_| Error::FailedDeserialize)?))

View File

@ -22,7 +22,10 @@ impl From<(H256, U256)> for StorageEntry {
// and compress second part of the value. If we have compression // and compress second part of the value. If we have compression
// over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey // over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey
impl Compact for StorageEntry { impl Compact for StorageEntry {
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
// for now put full bytes and later compress it. // for now put full bytes and later compress it.
buf.put_slice(&self.key.to_fixed_bytes()[..]); buf.put_slice(&self.key.to_fixed_bytes()[..]);
self.value.to_compact(buf) + 32 self.value.to_compact(buf) + 32

View File

@ -35,7 +35,10 @@ impl From<TxType> for u8 {
} }
impl Compact for TxType { impl Compact for TxType {
fn to_compact(self, _: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, _: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
match self { match self {
TxType::Legacy => 0, TxType::Legacy => 0,
TxType::EIP2930 => 1, TxType::EIP2930 => 1,

View File

@ -63,7 +63,10 @@ impl BranchNodeCompact {
} }
impl Compact for BranchNodeCompact { impl Compact for BranchNodeCompact {
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
let BranchNodeCompact { state_mask, tree_mask, hash_mask, root_hash, hashes } = self; let BranchNodeCompact { state_mask, tree_mask, hash_mask, root_hash, hashes } = self;
let mut buf_size = 0; let mut buf_size = 0;

View File

@ -62,7 +62,10 @@ impl std::fmt::Debug for TrieMask {
} }
impl Compact for TrieMask { impl Compact for TrieMask {
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
buf.put_slice(self.to_be_bytes().as_slice()); buf.put_slice(self.to_be_bytes().as_slice());
2 2
} }

View File

@ -28,7 +28,10 @@ impl From<Vec<u8>> for StoredNibblesSubKey {
} }
impl Compact for StoredNibblesSubKey { impl Compact for StoredNibblesSubKey {
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
assert!(self.inner.len() <= 64); assert!(self.inner.len() <= 64);
let mut padded = vec![0; 64]; let mut padded = vec![0; 64];
padded[..self.inner.len()].copy_from_slice(&self.inner[..]); padded[..self.inner.len()].copy_from_slice(&self.inner[..]);

View File

@ -15,7 +15,10 @@ pub struct StorageTrieEntry {
// and compress second part of the value. If we have compression // and compress second part of the value. If we have compression
// over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey // over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey
impl Compact for StorageTrieEntry { impl Compact for StorageTrieEntry {
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
let nibbles_len = self.nibbles.to_compact(buf); let nibbles_len = self.nibbles.to_compact(buf);
let node_len = self.node.to_compact(buf); let node_len = self.node.to_compact(buf);
nibbles_len + node_len nibbles_len + node_len

View File

@ -34,7 +34,7 @@ pub fn generate_from_to(ident: &Ident, fields: &FieldList) -> TokenStream2 {
} }
impl Compact for #ident { impl Compact for #ident {
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize where B: bytes::BufMut + AsMut<[u8]> {
let mut flags = #flags::default(); let mut flags = #flags::default();
let mut total_len = 0; let mut total_len = 0;
#(#to_compact)* #(#to_compact)*

View File

@ -249,7 +249,7 @@ mod tests {
fuzz_test_test_struct(TestStruct::default()) fuzz_test_test_struct(TestStruct::default())
} }
impl Compact for TestStruct { impl Compact for TestStruct {
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize where B: bytes::BufMut + AsMut<[u8]> {
let mut flags = TestStructFlags::default(); let mut flags = TestStructFlags::default();
let mut total_len = 0; let mut total_len = 0;
let mut buffer = bytes::BytesMut::new(); let mut buffer = bytes::BytesMut::new();

View File

@ -21,7 +21,9 @@ use revm_primitives::{B160 as H160, B256 as H256, U256};
/// size array like `Vec<H256>`. /// size array like `Vec<H256>`.
pub trait Compact { pub trait Compact {
/// Takes a buffer which can be written to. *Ideally*, it returns the length written to. /// Takes a buffer which can be written to. *Ideally*, it returns the length written to.
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize; fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>;
/// Takes a buffer which can be read from. Returns the object and `buf` with its internal cursor /// Takes a buffer which can be read from. Returns the object and `buf` with its internal cursor
/// advanced (eg.`.advance(len)`). /// advanced (eg.`.advance(len)`).
/// ///
@ -33,8 +35,9 @@ pub trait Compact {
Self: Sized; Self: Sized;
/// "Optional": If there's no good reason to use it, don't. /// "Optional": If there's no good reason to use it, don't.
fn specialized_to_compact(self, buf: &mut impl bytes::BufMut) -> usize fn specialized_to_compact<B>(self, buf: &mut B) -> usize
where where
B: bytes::BufMut + AsMut<[u8]>,
Self: Sized, Self: Sized,
{ {
self.to_compact(buf) self.to_compact(buf)
@ -53,7 +56,7 @@ macro_rules! impl_uint_compact {
($($name:tt),+) => { ($($name:tt),+) => {
$( $(
impl Compact for $name { impl Compact for $name {
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize where B: bytes::BufMut + AsMut<[u8]> {
let leading = self.leading_zeros() as usize / 8; let leading = self.leading_zeros() as usize / 8;
buf.put_slice(&self.to_be_bytes()[leading..]); buf.put_slice(&self.to_be_bytes()[leading..]);
std::mem::size_of::<$name>() - leading std::mem::size_of::<$name>() - leading
@ -82,15 +85,25 @@ where
T: Compact + Default, T: Compact + Default,
{ {
/// Returns 0 since we won't include it in the `StructFlags`. /// Returns 0 since we won't include it in the `StructFlags`.
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
// TODO: can it be smaller? // TODO: can it be smaller?
buf.put_u16(self.len() as u16); buf.put_u16(self.len() as u16);
for element in self { for element in self {
// TODO: elias fano? let length_index = buf.as_mut().len();
let mut inner = Vec::with_capacity(32);
buf.put_u16(element.to_compact(&mut inner) as u16); // Placeholder for the length, since it can only be known after compacting the element
buf.put_slice(&inner); // and BufMut doesn't support going back
buf.put_slice(&[0, 0]);
let len = element.to_compact(buf);
// Replace placeholder with the real length
buf.as_mut()[length_index..=length_index + 1]
.copy_from_slice(&(len as u16).to_be_bytes());
} }
0 0
} }
@ -115,7 +128,10 @@ where
} }
/// To be used by fixed sized types like `Vec<H256>`. /// To be used by fixed sized types like `Vec<H256>`.
fn specialized_to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn specialized_to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
buf.put_u16(self.len() as u16); buf.put_u16(self.len() as u16);
for element in self { for element in self {
@ -147,12 +163,23 @@ where
T: Compact + Default, T: Compact + Default,
{ {
/// Returns 0 for `None` and 1 for `Some(_)`. /// Returns 0 for `None` and 1 for `Some(_)`.
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
if let Some(element) = self { if let Some(element) = self {
let mut inner = vec![]; let length_index = buf.as_mut().len();
let len = element.to_compact(&mut inner);
buf.put_u16(len as u16); // Placeholder for the length, since it can only be known after compacting the element
buf.put_slice(&inner); // and BufMut doesn't support going back
buf.put_slice(&[0, 0]);
let len = element.to_compact(buf);
// Replace placeholder with the real length
buf.as_mut()[length_index..=length_index + 1]
.copy_from_slice(&(len as u16).to_be_bytes());
return 1 return 1
} }
0 0
@ -172,7 +199,10 @@ where
} }
/// To be used by fixed sized types like `Option<H256>`. /// To be used by fixed sized types like `Option<H256>`.
fn specialized_to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn specialized_to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
if let Some(element) = self { if let Some(element) = self {
element.to_compact(buf); element.to_compact(buf);
return 1 return 1
@ -193,7 +223,10 @@ where
} }
impl Compact for U256 { impl Compact for U256 {
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
let inner: [u8; 32] = self.to_be_bytes(); let inner: [u8; 32] = self.to_be_bytes();
let size = 32 - (self.leading_zeros() / 8); let size = 32 - (self.leading_zeros() / 8);
buf.put_slice(&inner[32 - size..]); buf.put_slice(&inner[32 - size..]);
@ -213,7 +246,10 @@ impl Compact for U256 {
} }
impl Compact for Bytes { impl Compact for Bytes {
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
let len = self.len(); let len = self.len();
buf.put(self); buf.put(self);
len len
@ -229,7 +265,7 @@ macro_rules! impl_hash_compact {
($($name:tt),+) => { ($($name:tt),+) => {
$( $(
impl Compact for $name { impl Compact for $name {
fn to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize where B: bytes::BufMut + AsMut<[u8]> {
buf.put_slice(&self.0); buf.put_slice(&self.0);
std::mem::size_of::<$name>() std::mem::size_of::<$name>()
} }
@ -246,7 +282,9 @@ macro_rules! impl_hash_compact {
(v, buf) (v, buf)
} }
fn specialized_to_compact(self, buf: &mut impl bytes::BufMut) -> usize { fn specialized_to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]> {
self.to_compact(buf) self.to_compact(buf)
} }
@ -262,7 +300,10 @@ impl_hash_compact!(H256, H160);
impl Compact for bool { impl Compact for bool {
/// `bool` vars go directly to the `StructFlags` and are not written to the buffer. /// `bool` vars go directly to the `StructFlags` and are not written to the buffer.
fn to_compact(self, _: &mut impl bytes::BufMut) -> usize { fn to_compact<B>(self, _: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
self as usize self as usize
} }

View File

@ -37,6 +37,7 @@ bytes = "1.4"
page_size = "0.4.2" page_size = "0.4.2"
thiserror = "1.0.37" thiserror = "1.0.37"
tempfile = { version = "3.3.0", optional = true } tempfile = { version = "3.3.0", optional = true }
parking_lot = "0.12"
# arbitrary utils # arbitrary utils
arbitrary = { version = "1.1.7", features = ["derive"], optional = true } arbitrary = { version = "1.1.7", features = ["derive"], optional = true }

View File

@ -13,10 +13,22 @@ use std::{
/// Trait that will transform the data to be saved in the DB in a (ideally) compressed format /// Trait that will transform the data to be saved in the DB in a (ideally) compressed format
pub trait Compress: Send + Sync + Sized + Debug { pub trait Compress: Send + Sync + Sized + Debug {
/// Compressed type. /// Compressed type.
type Compressed: AsRef<[u8]> + Send + Sync; type Compressed: bytes::BufMut + AsMut<[u8]> + Default + AsRef<[u8]> + Send + Sync;
/// If the type cannot be compressed, return its inner reference as `Some(self.as_ref())`
fn uncompressable_ref(&self) -> Option<&[u8]> {
None
}
/// Compresses data going into the database. /// Compresses data going into the database.
fn compress(self) -> Self::Compressed; fn compress(self) -> Self::Compressed {
let mut buf = Self::Compressed::default();
self.compress_to_buf(&mut buf);
buf
}
/// Compresses data to a given buffer.
fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(self, buf: &mut B);
} }
/// Trait that will transform the data to be read from the DB. /// Trait that will transform the data to be read from the DB.

View File

@ -28,6 +28,8 @@ pub struct Cursor<'tx, K: TransactionKind, T: Table> {
pub table: &'static str, pub table: &'static str,
/// Phantom data to enforce encoding/decoding. /// Phantom data to enforce encoding/decoding.
pub _dbi: std::marker::PhantomData<T>, pub _dbi: std::marker::PhantomData<T>,
/// Cache buffer that receives compressed values.
pub buf: Vec<u8>,
} }
/// Takes `(key, value)` from the database and decodes it appropriately. /// Takes `(key, value)` from the database and decodes it appropriately.
@ -38,6 +40,20 @@ macro_rules! decode {
}; };
} }
/// Some types don't support compression (eg. H256), and we don't want to be copying them to the
/// allocated buffer when we can just use their reference.
macro_rules! compress_or_ref {
($self:expr, $value:expr) => {
if let Some(value) = $value.uncompressable_ref() {
value
} else {
$self.buf.truncate(0);
$value.compress_to_buf(&mut $self.buf);
$self.buf.as_ref()
}
};
}
impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T> { impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T> {
fn first(&mut self) -> PairResult<T> { fn first(&mut self) -> PairResult<T> {
decode!(self.inner.first()) decode!(self.inner.first())
@ -212,13 +228,13 @@ impl<'tx, T: Table> DbCursorRW<'tx, T> for Cursor<'tx, RW, T> {
fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> { fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> {
// Default `WriteFlags` is UPSERT // Default `WriteFlags` is UPSERT
self.inner self.inner
.put(key.encode().as_ref(), value.compress().as_ref(), WriteFlags::UPSERT) .put(key.encode().as_ref(), compress_or_ref!(self, value), WriteFlags::UPSERT)
.map_err(|e| Error::Write(e.into())) .map_err(|e| Error::Write(e.into()))
} }
fn insert(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> { fn insert(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner self.inner
.put(key.encode().as_ref(), value.compress().as_ref(), WriteFlags::NO_OVERWRITE) .put(key.encode().as_ref(), compress_or_ref!(self, value), WriteFlags::NO_OVERWRITE)
.map_err(|e| Error::Write(e.into())) .map_err(|e| Error::Write(e.into()))
} }
@ -226,7 +242,7 @@ impl<'tx, T: Table> DbCursorRW<'tx, T> for Cursor<'tx, RW, T> {
/// will fail if the inserted key is less than the last table key /// will fail if the inserted key is less than the last table key
fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> { fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner self.inner
.put(key.encode().as_ref(), value.compress().as_ref(), WriteFlags::APPEND) .put(key.encode().as_ref(), compress_or_ref!(self, value), WriteFlags::APPEND)
.map_err(|e| Error::Write(e.into())) .map_err(|e| Error::Write(e.into()))
} }
@ -242,7 +258,7 @@ impl<'tx, T: DupSort> DbDupCursorRW<'tx, T> for Cursor<'tx, RW, T> {
fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> { fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner self.inner
.put(key.encode().as_ref(), value.compress().as_ref(), WriteFlags::APPEND_DUP) .put(key.encode().as_ref(), compress_or_ref!(self, value), WriteFlags::APPEND_DUP)
.map_err(|e| Error::Write(e.into())) .map_err(|e| Error::Write(e.into()))
} }
} }

View File

@ -3,19 +3,22 @@
use super::cursor::Cursor; use super::cursor::Cursor;
use crate::{ use crate::{
table::{Compress, DupSort, Encode, Table, TableImporter}, table::{Compress, DupSort, Encode, Table, TableImporter},
tables::utils::decode_one, tables::{utils::decode_one, NUM_TABLES, TABLES},
transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT}, transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT},
Error, Error,
}; };
use metrics::histogram; use metrics::histogram;
use reth_libmdbx::{EnvironmentKind, Transaction, TransactionKind, WriteFlags, RW}; use parking_lot::RwLock;
use std::{marker::PhantomData, time::Instant}; use reth_libmdbx::{EnvironmentKind, Transaction, TransactionKind, WriteFlags, DBI, RW};
use std::{marker::PhantomData, sync::Arc, time::Instant};
/// Wrapper for the libmdbx transaction. /// Wrapper for the libmdbx transaction.
#[derive(Debug)] #[derive(Debug)]
pub struct Tx<'a, K: TransactionKind, E: EnvironmentKind> { pub struct Tx<'a, K: TransactionKind, E: EnvironmentKind> {
/// Libmdbx-sys transaction. /// Libmdbx-sys transaction.
pub inner: Transaction<'a, K, E>, pub inner: Transaction<'a, K, E>,
/// Database table handle cache
pub db_handles: Arc<RwLock<[Option<DBI>; NUM_TABLES]>>,
} }
impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> {
@ -24,7 +27,7 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> {
where where
'a: 'env, 'a: 'env,
{ {
Self { inner } Self { inner, db_handles: Default::default() }
} }
/// Gets this transaction ID. /// Gets this transaction ID.
@ -32,17 +35,36 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> {
self.inner.id() self.inner.id()
} }
/// Gets a table database handle if it exists, otherwise creates it.
pub fn get_dbi<T: Table>(&self) -> Result<DBI, Error> {
let mut handles = self.db_handles.write();
let table_index = TABLES
.iter()
.enumerate()
.find_map(|(idx, (_, table))| (table == &T::NAME).then_some(idx))
.expect("Requested table should be part of `TABLES`.");
let dbi_handle = handles.get_mut(table_index).expect("should exist");
if dbi_handle.is_none() {
*dbi_handle = Some(
self.inner.open_db(Some(T::NAME)).map_err(|e| Error::InitCursor(e.into()))?.dbi(),
);
}
Ok(dbi_handle.expect("is some; qed"))
}
/// Create db Cursor /// Create db Cursor
pub fn new_cursor<T: Table>(&self) -> Result<Cursor<'env, K, T>, Error> { pub fn new_cursor<T: Table>(&self) -> Result<Cursor<'env, K, T>, Error> {
Ok(Cursor { Ok(Cursor {
inner: self inner: self
.inner .inner
.cursor( .cursor_with_dbi(self.get_dbi::<T>()?)
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::InitCursor(e.into()))?,
)
.map_err(|e| Error::InitCursor(e.into()))?, .map_err(|e| Error::InitCursor(e.into()))?,
table: T::NAME, table: T::NAME,
_dbi: PhantomData, _dbi: PhantomData,
buf: vec![],
}) })
} }
} }
@ -83,10 +105,7 @@ impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> {
fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, Error> { fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, Error> {
self.inner self.inner
.get( .get(self.get_dbi::<T>()?, key.encode().as_ref())
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Read(e.into()))?,
key.encode().as_ref(),
)
.map_err(|e| Error::Read(e.into()))? .map_err(|e| Error::Read(e.into()))?
.map(decode_one::<T>) .map(decode_one::<T>)
.transpose() .transpose()
@ -96,12 +115,7 @@ impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> {
impl<E: EnvironmentKind> DbTxMut<'_> for Tx<'_, RW, E> { impl<E: EnvironmentKind> DbTxMut<'_> for Tx<'_, RW, E> {
fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), Error> { fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner self.inner
.put( .put(self.get_dbi::<T>()?, &key.encode(), &value.compress(), WriteFlags::UPSERT)
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Write(e.into()))?,
&key.encode(),
&value.compress(),
WriteFlags::UPSERT,
)
.map_err(|e| Error::Write(e.into())) .map_err(|e| Error::Write(e.into()))
} }
@ -114,18 +128,12 @@ impl<E: EnvironmentKind> DbTxMut<'_> for Tx<'_, RW, E> {
}; };
self.inner self.inner
.del( .del(self.get_dbi::<T>()?, key.encode(), data)
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Delete(e.into()))?,
key.encode(),
data,
)
.map_err(|e| Error::Delete(e.into())) .map_err(|e| Error::Delete(e.into()))
} }
fn clear<T: Table>(&self) -> Result<(), Error> { fn clear<T: Table>(&self) -> Result<(), Error> {
self.inner self.inner.clear_db(self.get_dbi::<T>()?).map_err(|e| Error::Delete(e.into()))?;
.clear_db(&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Delete(e.into()))?)
.map_err(|e| Error::Delete(e.into()))?;
Ok(()) Ok(())
} }

View File

@ -14,10 +14,8 @@ macro_rules! impl_compression_for_compact {
{ {
type Compressed = Vec<u8>; type Compressed = Vec<u8>;
fn compress(self) -> Self::Compressed { fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(self, buf: &mut B) {
let mut buf = vec![]; let _ = Compact::to_compact(self, buf);
let _ = Compact::to_compact(self, &mut buf);
buf
} }
} }
@ -52,7 +50,37 @@ impl_compression_for_compact!(
); );
impl_compression_for_compact!(AccountBeforeTx, TransactionSigned); impl_compression_for_compact!(AccountBeforeTx, TransactionSigned);
impl_compression_for_compact!(CompactU256); impl_compression_for_compact!(CompactU256);
impl_compression_for_compact!(H256, H160);
macro_rules! impl_compression_fixed_compact {
($($name:tt),+) => {
$(
impl Compress for $name
{
type Compressed = Vec<u8>;
fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(self, buf: &mut B) {
let _ = Compact::to_compact(self, buf);
}
fn uncompressable_ref(&self) -> Option<&[u8]> {
Some(self.as_ref())
}
}
impl Decompress for $name
{
fn decompress<B: AsRef<[u8]>>(value: B) -> Result<$name, Error> {
let value = value.as_ref();
let (obj, _) = Compact::from_compact(&value, value.len());
Ok(obj)
}
}
)+
};
}
impl_compression_fixed_compact!(H256, H160);
/// Adds wrapper structs for some primitive types so they can use StructFlags from Compact, when /// Adds wrapper structs for some primitive types so they can use StructFlags from Compact, when
/// used as pure table values. /// used as pure table values.

View File

@ -17,6 +17,10 @@ where
{ {
type Compressed = Vec<u8>; type Compressed = Vec<u8>;
fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(self, buf: &mut B) {
buf.put_slice(&parity_scale_codec::Encode::encode(&self))
}
fn compress(self) -> Self::Compressed { fn compress(self) -> Self::Compressed {
parity_scale_codec::Encode::encode(&self) parity_scale_codec::Encode::encode(&self)
} }

View File

@ -47,8 +47,11 @@ pub enum TableType {
DupSort, DupSort,
} }
/// Number of tables that should be present inside database.
pub const NUM_TABLES: usize = 25;
/// Default tables that should be present inside database. /// Default tables that should be present inside database.
pub const TABLES: [(TableType, &str); 25] = [ pub const TABLES: [(TableType, &str); NUM_TABLES] = [
(TableType::Table, CanonicalHeaders::const_name()), (TableType::Table, CanonicalHeaders::const_name()),
(TableType::Table, HeaderTD::const_name()), (TableType::Table, HeaderTD::const_name()),
(TableType::Table, HeaderNumbers::const_name()), (TableType::Table, HeaderNumbers::const_name()),

View File

@ -6,7 +6,7 @@ use crate::{
Error, Error,
}; };
use reth_codecs::Compact; use reth_codecs::Compact;
use reth_primitives::{bytes::BufMut, Account, Address, TransitionId}; use reth_primitives::{Account, Address, TransitionId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Account as it is saved inside [`AccountChangeSet`][crate::tables::AccountChangeSet]. /// Account as it is saved inside [`AccountChangeSet`][crate::tables::AccountChangeSet].
@ -24,7 +24,10 @@ pub struct AccountBeforeTx {
// and compress second part of the value. If we have compression // and compress second part of the value. If we have compression
// over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey // over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey
impl Compact for AccountBeforeTx { impl Compact for AccountBeforeTx {
fn to_compact(self, buf: &mut impl BufMut) -> usize { fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
// for now put full bytes and later compress it. // for now put full bytes and later compress it.
buf.put_slice(&self.address.to_fixed_bytes()[..]); buf.put_slice(&self.address.to_fixed_bytes()[..]);
self.info.to_compact(buf) + 32 self.info.to_compact(buf) + 32

View File

@ -12,6 +12,9 @@ impl Compress for IntegerList {
fn compress(self) -> Self::Compressed { fn compress(self) -> Self::Compressed {
self.to_bytes() self.to_bytes()
} }
fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(self, buf: &mut B) {
self.to_mut_bytes(buf)
}
} }
impl Decompress for IntegerList { impl Decompress for IntegerList {

View File

@ -108,6 +108,15 @@ impl<V: Value> Compress for RawValue<V> {
fn compress(self) -> Self::Compressed { fn compress(self) -> Self::Compressed {
self.value self.value
} }
fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(self, buf: &mut B) {
buf.put_slice(self.value.as_slice())
}
fn uncompressable_ref(&self) -> Option<&[u8]> {
// Already compressed
Some(&self.value)
}
} }
impl<V: Value> Decompress for RawValue<V> { impl<V: Value> Decompress for RawValue<V> {

View File

@ -22,7 +22,7 @@ fn bench_get_rand(c: &mut Criterion) {
b.iter(|| { b.iter(|| {
let mut i = 0usize; let mut i = 0usize;
for key in &keys { for key in &keys {
i += *txn.get::<ObjectLength>(&db, key.as_bytes()).unwrap().unwrap(); i += *txn.get::<ObjectLength>(db.dbi(), key.as_bytes()).unwrap().unwrap();
} }
black_box(i); black_box(i);
}) })
@ -76,7 +76,7 @@ fn bench_put_rand(c: &mut Criterion) {
b.iter(|| { b.iter(|| {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
for (key, data) in items.iter() { for (key, data) in items.iter() {
txn.put(&db, key, data, WriteFlags::empty()).unwrap(); txn.put(db.dbi(), key, data, WriteFlags::empty()).unwrap();
} }
}) })
}); });

View File

@ -17,7 +17,7 @@ pub fn setup_bench_db(num_rows: u32) -> (TempDir, Environment<NoWriteMap>) {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
for i in 0..num_rows { for i in 0..num_rows {
txn.put(&db, get_key(i), get_data(i), WriteFlags::empty()).unwrap(); txn.put(db.dbi(), get_key(i), get_data(i), WriteFlags::empty()).unwrap();
} }
txn.commit().unwrap(); txn.commit().unwrap();
} }

View File

@ -1,5 +1,4 @@
use crate::{ use crate::{
database::Database,
error::{mdbx_result, Error, Result}, error::{mdbx_result, Error, Result},
flags::*, flags::*,
mdbx_try_optional, mdbx_try_optional,
@ -32,15 +31,13 @@ where
{ {
pub(crate) fn new<E: EnvironmentKind>( pub(crate) fn new<E: EnvironmentKind>(
txn: &'txn Transaction<K, E>, txn: &'txn Transaction<K, E>,
db: &Database<'txn>, dbi: ffi::MDBX_dbi,
) -> Result<Self> { ) -> Result<Self> {
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut(); let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
let txn = txn.txn_mutex(); let txn = txn.txn_mutex();
unsafe { unsafe {
mdbx_result(txn_execute(&txn, |txn| { mdbx_result(txn_execute(&txn, |txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)))?;
ffi::mdbx_cursor_open(txn, db.dbi(), &mut cursor)
}))?;
} }
Ok(Self { txn, cursor, _marker: PhantomData }) Ok(Self { txn, cursor, _marker: PhantomData })
} }

View File

@ -13,6 +13,7 @@ pub use crate::{
flags::*, flags::*,
transaction::{Transaction, TransactionKind, RO, RW}, transaction::{Transaction, TransactionKind, RO, RW},
}; };
pub use ffi::MDBX_dbi as DBI;
mod codec; mod codec;
mod cursor; mod cursor;
@ -52,7 +53,7 @@ mod test_utils {
LittleEndian::write_u64(&mut value, height); LittleEndian::write_u64(&mut value, height);
let tx = env.begin_rw_txn().expect("begin_rw_txn"); let tx = env.begin_rw_txn().expect("begin_rw_txn");
let index = tx.create_db(None, DatabaseFlags::DUP_SORT).expect("open index db"); let index = tx.create_db(None, DatabaseFlags::DUP_SORT).expect("open index db");
tx.put(&index, HEIGHT_KEY, value, WriteFlags::empty()).expect("tx.put"); tx.put(index.dbi(), HEIGHT_KEY, value, WriteFlags::empty()).expect("tx.put");
tx.commit().expect("tx.commit"); tx.commit().expect("tx.commit");
} }
} }

View File

@ -123,7 +123,7 @@ where
/// returned. Retrieval of other items requires the use of /// returned. Retrieval of other items requires the use of
/// [Cursor]. If the item is not in the database, then /// [Cursor]. If the item is not in the database, then
/// [None] will be returned. /// [None] will be returned.
pub fn get<'txn, Key>(&'txn self, db: &Database<'txn>, key: &[u8]) -> Result<Option<Key>> pub fn get<'txn, Key>(&'txn self, dbi: ffi::MDBX_dbi, key: &[u8]) -> Result<Option<Key>>
where where
Key: TableObject<'txn>, Key: TableObject<'txn>,
{ {
@ -132,7 +132,7 @@ where
let mut data_val: ffi::MDBX_val = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let mut data_val: ffi::MDBX_val = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
txn_execute(&self.txn, |txn| unsafe { txn_execute(&self.txn, |txn| unsafe {
match ffi::mdbx_get(txn, db.dbi(), &key_val, &mut data_val) { match ffi::mdbx_get(txn, dbi, &key_val, &mut data_val) {
ffi::MDBX_SUCCESS => Key::decode_val::<K>(txn, &data_val).map(Some), ffi::MDBX_SUCCESS => Key::decode_val::<K>(txn, &data_val).map(Some),
ffi::MDBX_NOTFOUND => Ok(None), ffi::MDBX_NOTFOUND => Ok(None),
err_code => Err(Error::from_err_code(err_code)), err_code => Err(Error::from_err_code(err_code)),
@ -213,7 +213,12 @@ where
/// Open a new cursor on the given database. /// Open a new cursor on the given database.
pub fn cursor<'txn>(&'txn self, db: &Database<'txn>) -> Result<Cursor<'txn, K>> { pub fn cursor<'txn>(&'txn self, db: &Database<'txn>) -> Result<Cursor<'txn, K>> {
Cursor::new(self, db) Cursor::new(self, db.dbi())
}
/// Open a new cursor on the given dbi.
pub fn cursor_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<'_, K>> {
Cursor::new(self, dbi)
} }
} }
@ -263,9 +268,9 @@ where
/// behavior is to enter the new key/data pair, replacing any previously /// behavior is to enter the new key/data pair, replacing any previously
/// existing key if duplicates are disallowed, or adding a duplicate data /// existing key if duplicates are disallowed, or adding a duplicate data
/// item if duplicates are allowed ([DatabaseFlags::DUP_SORT]). /// item if duplicates are allowed ([DatabaseFlags::DUP_SORT]).
pub fn put<'txn>( pub fn put(
&'txn self, &self,
db: &Database<'txn>, dbi: ffi::MDBX_dbi,
key: impl AsRef<[u8]>, key: impl AsRef<[u8]>,
data: impl AsRef<[u8]>, data: impl AsRef<[u8]>,
flags: WriteFlags, flags: WriteFlags,
@ -277,7 +282,7 @@ where
let mut data_val: ffi::MDBX_val = let mut data_val: ffi::MDBX_val =
ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void }; ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void };
mdbx_result(txn_execute(&self.txn, |txn| unsafe { mdbx_result(txn_execute(&self.txn, |txn| unsafe {
ffi::mdbx_put(txn, db.dbi(), &key_val, &mut data_val, flags.bits()) ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits())
}))?; }))?;
Ok(()) Ok(())
@ -321,9 +326,9 @@ where
/// be deleted. /// be deleted.
/// ///
/// Returns `true` if the key/value pair was present. /// Returns `true` if the key/value pair was present.
pub fn del<'txn>( pub fn del(
&'txn self, &self,
db: &Database<'txn>, dbi: ffi::MDBX_dbi,
key: impl AsRef<[u8]>, key: impl AsRef<[u8]>,
data: Option<&[u8]>, data: Option<&[u8]>,
) -> Result<bool> { ) -> Result<bool> {
@ -338,9 +343,9 @@ where
mdbx_result({ mdbx_result({
txn_execute(&self.txn, |txn| { txn_execute(&self.txn, |txn| {
if let Some(d) = data_val { if let Some(d) = data_val {
unsafe { ffi::mdbx_del(txn, db.dbi(), &key_val, &d) } unsafe { ffi::mdbx_del(txn, dbi, &key_val, &d) }
} else { } else {
unsafe { ffi::mdbx_del(txn, db.dbi(), &key_val, ptr::null()) } unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) }
} }
}) })
}) })
@ -352,8 +357,8 @@ where
} }
/// Empties the given database. All items will be removed. /// Empties the given database. All items will be removed.
pub fn clear_db<'txn>(&'txn self, db: &Database<'txn>) -> Result<()> { pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
mdbx_result(txn_execute(&self.txn, |txn| unsafe { ffi::mdbx_drop(txn, db.dbi(), false) }))?; mdbx_result(txn_execute(&self.txn, |txn| unsafe { ffi::mdbx_drop(txn, dbi, false) }))?;
Ok(()) Ok(())
} }

View File

@ -14,9 +14,9 @@ fn test_get() {
assert_eq!(None, txn.cursor(&db).unwrap().first::<(), ()>().unwrap()); assert_eq!(None, txn.cursor(&db).unwrap().first::<(), ()>().unwrap());
txn.put(&db, b"key1", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key3", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key3", b"val3", WriteFlags::empty()).unwrap();
let mut cursor = txn.cursor(&db).unwrap(); let mut cursor = txn.cursor(&db).unwrap();
assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1"))); assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1")));
@ -36,12 +36,12 @@ fn test_get_dup() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap(); let db = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
txn.put(&db, b"key1", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key1", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key1", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val3", WriteFlags::empty()).unwrap();
let mut cursor = txn.cursor(&db).unwrap(); let mut cursor = txn.cursor(&db).unwrap();
assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1"))); assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1")));
@ -80,12 +80,12 @@ fn test_get_dupfixed() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED).unwrap(); let db = txn.create_db(None, DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED).unwrap();
txn.put(&db, b"key1", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key1", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key1", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val4", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val4", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val5", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val5", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val6", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val6", WriteFlags::empty()).unwrap();
let mut cursor = txn.cursor(&db).unwrap(); let mut cursor = txn.cursor(&db).unwrap();
assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1"))); assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1")));
@ -109,7 +109,7 @@ fn test_iter() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
for (key, data) in &items { for (key, data) in &items {
txn.put(&db, key, data, WriteFlags::empty()).unwrap(); txn.put(db.dbi(), key, data, WriteFlags::empty()).unwrap();
} }
assert!(!txn.commit().unwrap()); assert!(!txn.commit().unwrap());
} }
@ -218,7 +218,7 @@ fn test_iter_dup() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
for (key, data) in items.clone() { for (key, data) in items.clone() {
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
txn.put(&db, key, data, WriteFlags::empty()).unwrap(); txn.put(db.dbi(), key, data, WriteFlags::empty()).unwrap();
} }
txn.commit().unwrap(); txn.commit().unwrap();
} }
@ -289,7 +289,7 @@ fn test_iter_del_get() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
for (key, data) in &items { for (key, data) in &items {
txn.put(&db, key, data, WriteFlags::empty()).unwrap(); txn.put(db.dbi(), key, data, WriteFlags::empty()).unwrap();
} }
txn.commit().unwrap(); txn.commit().unwrap();
} }

View File

@ -101,7 +101,8 @@ fn test_stat() {
let mut value = [0u8; 8]; let mut value = [0u8; 8];
LittleEndian::write_u64(&mut value, i); LittleEndian::write_u64(&mut value, i);
let tx = env.begin_rw_txn().expect("begin_rw_txn"); let tx = env.begin_rw_txn().expect("begin_rw_txn");
tx.put(&tx.open_db(None).unwrap(), value, value, WriteFlags::default()).expect("tx.put"); tx.put(tx.open_db(None).unwrap().dbi(), value, value, WriteFlags::default())
.expect("tx.put");
tx.commit().expect("tx.commit"); tx.commit().expect("tx.commit");
} }
@ -143,11 +144,12 @@ fn test_freelist() {
let mut value = [0u8; 8]; let mut value = [0u8; 8];
LittleEndian::write_u64(&mut value, i); LittleEndian::write_u64(&mut value, i);
let tx = env.begin_rw_txn().expect("begin_rw_txn"); let tx = env.begin_rw_txn().expect("begin_rw_txn");
tx.put(&tx.open_db(None).unwrap(), value, value, WriteFlags::default()).expect("tx.put"); tx.put(tx.open_db(None).unwrap().dbi(), value, value, WriteFlags::default())
.expect("tx.put");
tx.commit().expect("tx.commit"); tx.commit().expect("tx.commit");
} }
let tx = env.begin_rw_txn().expect("begin_rw_txn"); let tx = env.begin_rw_txn().expect("begin_rw_txn");
tx.clear_db(&tx.open_db(None).unwrap()).expect("clear"); tx.clear_db(tx.open_db(None).unwrap().dbi()).expect("clear");
tx.commit().expect("tx.commit"); tx.commit().expect("tx.commit");
// Freelist should not be empty after clear_db. // Freelist should not be empty after clear_db.

View File

@ -16,20 +16,20 @@ fn test_put_get_del() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
txn.put(&db, b"key1", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key3", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key3", b"val3", WriteFlags::empty()).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
assert_eq!(txn.get(&db, b"key1").unwrap(), Some(*b"val1")); assert_eq!(txn.get(db.dbi(), b"key1").unwrap(), Some(*b"val1"));
assert_eq!(txn.get(&db, b"key2").unwrap(), Some(*b"val2")); assert_eq!(txn.get(db.dbi(), b"key2").unwrap(), Some(*b"val2"));
assert_eq!(txn.get(&db, b"key3").unwrap(), Some(*b"val3")); assert_eq!(txn.get(db.dbi(), b"key3").unwrap(), Some(*b"val3"));
assert_eq!(txn.get::<()>(&db, b"key").unwrap(), None); assert_eq!(txn.get::<()>(db.dbi(), b"key").unwrap(), None);
txn.del(&db, b"key1", None).unwrap(); txn.del(db.dbi(), b"key1", None).unwrap();
assert_eq!(txn.get::<()>(&db, b"key1").unwrap(), None); assert_eq!(txn.get::<()>(db.dbi(), b"key1").unwrap(), None);
} }
#[test] #[test]
@ -39,15 +39,15 @@ fn test_put_get_del_multi() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap(); let db = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
txn.put(&db, b"key1", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key1", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key1", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val3", WriteFlags::empty()).unwrap();
txn.put(&db, b"key3", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key3", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key3", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key3", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key3", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key3", b"val3", WriteFlags::empty()).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
@ -62,8 +62,8 @@ fn test_put_get_del_multi() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
txn.del(&db, b"key1", Some(b"val2")).unwrap(); txn.del(db.dbi(), b"key1", Some(b"val2")).unwrap();
txn.del(&db, b"key2", None).unwrap(); txn.del(db.dbi(), b"key2", None).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
@ -87,15 +87,15 @@ fn test_put_get_del_empty_key() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, Default::default()).unwrap(); let db = txn.create_db(None, Default::default()).unwrap();
txn.put(&db, b"", b"hello", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"", b"hello", WriteFlags::empty()).unwrap();
assert_eq!(txn.get(&db, b"").unwrap(), Some(*b"hello")); assert_eq!(txn.get(db.dbi(), b"").unwrap(), Some(*b"hello"));
txn.commit().unwrap(); txn.commit().unwrap();
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
assert_eq!(txn.get(&db, b"").unwrap(), Some(*b"hello")); assert_eq!(txn.get(db.dbi(), b"").unwrap(), Some(*b"hello"));
txn.put(&db, b"", b"", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"", b"", WriteFlags::empty()).unwrap();
assert_eq!(txn.get(&db, b"").unwrap(), Some(*b"")); assert_eq!(txn.get(db.dbi(), b"").unwrap(), Some(*b""));
} }
#[test] #[test]
@ -113,11 +113,11 @@ fn test_reserve() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
assert_eq!(txn.get(&db, b"key1").unwrap(), Some(*b"val1")); assert_eq!(txn.get(db.dbi(), b"key1").unwrap(), Some(*b"val1"));
assert_eq!(txn.get::<()>(&db, b"key").unwrap(), None); assert_eq!(txn.get::<()>(db.dbi(), b"key").unwrap(), None);
txn.del(&db, b"key1", None).unwrap(); txn.del(db.dbi(), b"key1", None).unwrap();
assert_eq!(txn.get::<()>(&db, b"key1").unwrap(), None); assert_eq!(txn.get::<()>(db.dbi(), b"key1").unwrap(), None);
} }
#[test] #[test]
@ -126,19 +126,19 @@ fn test_nested_txn() {
let env = Environment::new().open(dir.path()).unwrap(); let env = Environment::new().open(dir.path()).unwrap();
let mut txn = env.begin_rw_txn().unwrap(); let mut txn = env.begin_rw_txn().unwrap();
txn.put(&txn.open_db(None).unwrap(), b"key1", b"val1", WriteFlags::empty()).unwrap(); txn.put(txn.open_db(None).unwrap().dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
{ {
let nested = txn.begin_nested_txn().unwrap(); let nested = txn.begin_nested_txn().unwrap();
let db = nested.open_db(None).unwrap(); let db = nested.open_db(None).unwrap();
nested.put(&db, b"key2", b"val2", WriteFlags::empty()).unwrap(); nested.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
assert_eq!(nested.get(&db, b"key1").unwrap(), Some(*b"val1")); assert_eq!(nested.get(db.dbi(), b"key1").unwrap(), Some(*b"val1"));
assert_eq!(nested.get(&db, b"key2").unwrap(), Some(*b"val2")); assert_eq!(nested.get(db.dbi(), b"key2").unwrap(), Some(*b"val2"));
} }
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
assert_eq!(txn.get(&db, b"key1").unwrap(), Some(*b"val1")); assert_eq!(txn.get(db.dbi(), b"key1").unwrap(), Some(*b"val1"));
assert_eq!(txn.get::<()>(&db, b"key2").unwrap(), None); assert_eq!(txn.get::<()>(db.dbi(), b"key2").unwrap(), None);
} }
#[test] #[test]
@ -148,18 +148,18 @@ fn test_clear_db() {
{ {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
txn.put(&txn.open_db(None).unwrap(), b"key", b"val", WriteFlags::empty()).unwrap(); txn.put(txn.open_db(None).unwrap().dbi(), b"key", b"val", WriteFlags::empty()).unwrap();
assert!(!txn.commit().unwrap()); assert!(!txn.commit().unwrap());
} }
{ {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
txn.clear_db(&txn.open_db(None).unwrap()).unwrap(); txn.clear_db(txn.open_db(None).unwrap().dbi()).unwrap();
assert!(!txn.commit().unwrap()); assert!(!txn.commit().unwrap());
} }
let txn = env.begin_ro_txn().unwrap(); let txn = env.begin_ro_txn().unwrap();
assert_eq!(txn.get::<()>(&txn.open_db(None).unwrap(), b"key").unwrap(), None); assert_eq!(txn.get::<()>(txn.open_db(None).unwrap().dbi(), b"key").unwrap(), None);
} }
#[test] #[test]
@ -171,7 +171,7 @@ fn test_drop_db() {
{ {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
txn.put( txn.put(
&txn.create_db(Some("test"), DatabaseFlags::empty()).unwrap(), txn.create_db(Some("test"), DatabaseFlags::empty()).unwrap().dbi(),
b"key", b"key",
b"val", b"val",
WriteFlags::empty(), WriteFlags::empty(),
@ -219,14 +219,14 @@ fn test_concurrent_readers_single_writer() {
{ {
let txn = reader_env.begin_ro_txn().unwrap(); let txn = reader_env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
assert_eq!(txn.get::<()>(&db, key).unwrap(), None); assert_eq!(txn.get::<()>(db.dbi(), key).unwrap(), None);
} }
reader_barrier.wait(); reader_barrier.wait();
reader_barrier.wait(); reader_barrier.wait();
{ {
let txn = reader_env.begin_ro_txn().unwrap(); let txn = reader_env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
txn.get::<[u8; 3]>(&db, key).unwrap().unwrap() == *val txn.get::<[u8; 3]>(db.dbi(), key).unwrap().unwrap() == *val
} }
})); }));
} }
@ -235,7 +235,7 @@ fn test_concurrent_readers_single_writer() {
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
barrier.wait(); barrier.wait();
txn.put(&db, key, val, WriteFlags::empty()).unwrap(); txn.put(db.dbi(), key, val, WriteFlags::empty()).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
barrier.wait(); barrier.wait();
@ -260,7 +260,8 @@ fn test_concurrent_writers() {
threads.push(thread::spawn(move || { threads.push(thread::spawn(move || {
let txn = writer_env.begin_rw_txn().unwrap(); let txn = writer_env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
txn.put(&db, format!("{key}{i}"), format!("{val}{i}"), WriteFlags::empty()).unwrap(); txn.put(db.dbi(), format!("{key}{i}"), format!("{val}{i}"), WriteFlags::empty())
.unwrap();
txn.commit().is_ok() txn.commit().is_ok()
})); }));
} }
@ -272,7 +273,7 @@ fn test_concurrent_writers() {
for i in 0..n { for i in 0..n {
assert_eq!( assert_eq!(
Cow::<Vec<u8>>::Owned(format!("{val}{i}").into_bytes()), Cow::<Vec<u8>>::Owned(format!("{val}{i}").into_bytes()),
txn.get(&db, format!("{key}{i}").as_bytes()).unwrap().unwrap() txn.get(db.dbi(), format!("{key}{i}").as_bytes()).unwrap().unwrap()
); );
} }
} }
@ -284,9 +285,9 @@ fn test_stat() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
txn.put(&db, b"key1", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key3", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key3", b"val3", WriteFlags::empty()).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
{ {
@ -298,8 +299,8 @@ fn test_stat() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
txn.del(&db, b"key1", None).unwrap(); txn.del(db.dbi(), b"key1", None).unwrap();
txn.del(&db, b"key2", None).unwrap(); txn.del(db.dbi(), b"key2", None).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
{ {
@ -311,9 +312,9 @@ fn test_stat() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
txn.put(&db, b"key4", b"val4", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key4", b"val4", WriteFlags::empty()).unwrap();
txn.put(&db, b"key5", b"val5", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key5", b"val5", WriteFlags::empty()).unwrap();
txn.put(&db, b"key6", b"val6", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key6", b"val6", WriteFlags::empty()).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
{ {
@ -331,15 +332,15 @@ fn test_stat_dupsort() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap(); let db = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
txn.put(&db, b"key1", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key1", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key1", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key2", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key2", b"val3", WriteFlags::empty()).unwrap();
txn.put(&db, b"key3", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key3", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key3", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key3", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key3", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key3", b"val3", WriteFlags::empty()).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
{ {
@ -350,8 +351,8 @@ fn test_stat_dupsort() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
txn.del(&db, b"key1", Some(b"val2")).unwrap(); txn.del(db.dbi(), b"key1", Some(b"val2")).unwrap();
txn.del(&db, b"key2", None).unwrap(); txn.del(db.dbi(), b"key2", None).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
{ {
@ -362,9 +363,9 @@ fn test_stat_dupsort() {
let txn = env.begin_rw_txn().unwrap(); let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap(); let db = txn.open_db(None).unwrap();
txn.put(&db, b"key4", b"val1", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key4", b"val1", WriteFlags::empty()).unwrap();
txn.put(&db, b"key4", b"val2", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key4", b"val2", WriteFlags::empty()).unwrap();
txn.put(&db, b"key4", b"val3", WriteFlags::empty()).unwrap(); txn.put(db.dbi(), b"key4", b"val3", WriteFlags::empty()).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
{ {