feat: changed walk(), walk_dup() definitions and impls to accept Option<T::(sub)Key> #1197 (#1283)

Co-authored-by: Estéfano Bargas <estefano.bargas@fing.edu.uy>
Co-authored-by: lambdaclass-user <github@lambdaclass.com>
This commit is contained in:
Georgios Konstantopoulos
2023-02-10 21:10:31 -08:00
committed by GitHub
parent 6473547d03
commit ea5633b3c3
17 changed files with 103 additions and 71 deletions

View File

@ -174,7 +174,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<TestOutcome> {
let storage = db.view(|tx| -> Result<_, DbError> {
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let walker = cursor.first()?.map(|first| cursor.walk(first.0)).transpose()?;
let walker = cursor.first()?.map(|first| cursor.walk(Some(first.0))).transpose()?;
Ok(walker.map(|mut walker| {
let mut map: HashMap<Address, HashMap<U256, U256>> = HashMap::new();
while let Some(Ok((address, slot))) = walker.next() {
@ -210,7 +210,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<TestOutcome> {
}
Some(RootOrState::State(state)) => db.view(|tx| -> eyre::Result<()> {
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let walker = cursor.first()?.map(|first| cursor.walk(first.0)).transpose()?;
let walker = cursor.first()?.map(|first| cursor.walk(Some(first.0))).transpose()?;
let storage = walker.map(|mut walker| {
let mut map: HashMap<Address, HashMap<U256, U256>> = HashMap::new();
while let Some(Ok((address, slot))) = walker.next() {

View File

@ -199,7 +199,7 @@ where
T2: Table<Key = T1::Value>,
{
let mut cursor = self.cursor_write::<T1>()?;
let mut walker = cursor.walk(start_at)?;
let mut walker = cursor.walk(Some(start_at))?;
while let Some((_, value)) = walker.next().transpose()? {
self.delete::<T2>(value, None)?;
}

View File

@ -619,7 +619,8 @@ mod tests {
let mut prev_number: Option<BlockNumber> = None;
let mut expected_transition_id = 0;
for entry in bodies_cursor.walk(first_body_key)? {
for entry in bodies_cursor.walk(Some(first_body_key))? {
let (number, body) = entry?;
// Validate sequentiality only after prev progress,

View File

@ -115,7 +115,7 @@ impl ExecutionStage {
tracing::trace!(target: "sync::stages::execution", ?block_number, "Execute block.");
// iterate over all transactions
let mut tx_walker = tx_cursor.walk(body.start_tx_id)?;
let mut tx_walker = tx_cursor.walk(Some(body.start_tx_id))?;
let mut transactions = Vec::with_capacity(body.tx_count as usize);
// get next N transactions.
for index in body.tx_id_range() {
@ -129,7 +129,7 @@ impl ExecutionStage {
}
// take signers
let mut tx_sender_walker = tx_sender.walk(body.start_tx_id)?;
let mut tx_sender_walker = tx_sender.walk(Some(body.start_tx_id))?;
let mut signers = Vec::with_capacity(body.tx_count as usize);
for index in body.tx_id_range() {
let (tx_index, tx) = tx_sender_walker
@ -198,7 +198,7 @@ impl ExecutionStage {
if wipe_storage {
// iterate over storage and save them before entry is deleted.
tx.cursor_read::<tables::PlainStorageState>()?
.walk(address)?
.walk(Some(address))?
.take_while(|res| {
res.as_ref().map(|(k, _)| *k == address).unwrap_or_default()
})

View File

@ -7,7 +7,7 @@ use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{keccak256, Account, Address, H160};
use reth_primitives::{keccak256, Account, Address};
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,
@ -63,7 +63,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
tx.clear::<tables::HashedAccount>()?;
tx.commit()?;
let mut first_key = H160::zero();
let mut first_key = None;
loop {
let next_key = {
let mut accounts = tx.cursor_read::<tables::PlainAccountState>()?;
@ -85,7 +85,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
};
tx.commit()?;
if let Some((next_key, _)) = next_key {
first_key = next_key;
first_key = Some(next_key);
continue
}
break

View File

@ -8,7 +8,7 @@ use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{keccak256, Address, StorageEntry, H160, H256, U256};
use reth_primitives::{keccak256, Address, StorageEntry, H256, U256};
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,
@ -63,7 +63,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx.clear::<tables::HashedStorage>()?;
tx.commit()?;
let mut first_key = H160::zero();
let mut first_key = None;
loop {
let next_key = {
let mut storage = tx.cursor_dup_read::<tables::PlainStorageState>()?;
@ -92,7 +92,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx.commit()?;
first_key = match next_key {
Some(key) => key,
Some(key) => Some(key),
None => break,
};
}
@ -413,8 +413,7 @@ mod tests {
);
expected += 1;
}
let count =
tx.cursor_dup_read::<tables::HashedStorage>()?.walk(H256::zero())?.count();
let count = tx.cursor_dup_read::<tables::HashedStorage>()?.walk(None)?.count();
assert_eq!(count, expected);
Ok(())

View File

@ -59,7 +59,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
let account_changesets = tx
.cursor_read::<tables::AccountChangeSet>()?
.walk(from_transition)?
.walk(Some(from_transition))?
.take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition).unwrap_or_default())
.collect::<Result<Vec<_>, _>>()?;
@ -122,7 +122,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
let account_changeset = tx
.cursor_read::<tables::AccountChangeSet>()?
.walk(from_transition_rev)?
.walk(Some(from_transition_rev))?
.take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition_rev).unwrap_or_default())
.collect::<Result<Vec<_>, _>>()?;

View File

@ -59,7 +59,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
let storage_chageset = tx
.cursor_read::<tables::StorageChangeSet>()?
.walk((from_transition, Address::zero()).into())?
.walk(Some((from_transition, Address::zero()).into()))?
.take_while(|res| {
res.as_ref().map(|(k, _)| k.transition_id() < to_transition).unwrap_or_default()
})
@ -128,7 +128,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
let storage_changesets = tx
.cursor_read::<tables::StorageChangeSet>()?
.walk((from_transition_rev, Address::zero()).into())?
.walk(Some((from_transition_rev, Address::zero()).into()))?
.take_while(|res| {
res.as_ref().map(|(k, _)| k.transition_id() < to_transition_rev).unwrap_or_default()
})

View File

@ -65,7 +65,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
debug!(target: "sync::stages::total_difficulty", ?td, block_number = last_header_number, "Last total difficulty entry");
let walker = cursor_headers
.walk(start_block)?
.walk(Some(start_block))?
.take_while(|e| e.as_ref().map(|(_, h)| h.number <= end_block).unwrap_or_default());
// Walk over newly inserted headers, update & insert td
for entry in walker {

View File

@ -58,7 +58,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
let mut tx_cursor = tx.cursor_write::<tables::Transactions>()?;
// Walk over block bodies within a specified range.
let bodies = cursor_bodies.walk(start_block)?.take_while(|entry| {
let bodies = cursor_bodies.walk(Some(start_block))?.take_while(|entry| {
entry.as_ref().map(|(num, _)| *num <= end_block).unwrap_or_default()
});
@ -66,7 +66,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
let mut tx_list = vec![];
for body_entry in bodies {
let (_, body) = body_entry?;
let transactions = tx_cursor.walk(body.start_tx_id)?.take(body.tx_count as usize);
let transactions = tx_cursor.walk(Some(body.start_tx_id))?.take(body.tx_count as usize);
for tx_entry in transactions {
let (id, transaction) = tx_entry?;

View File

@ -85,7 +85,9 @@ impl TestTransaction {
T::Key: Default + Ord,
{
self.query(|tx| {
tx.cursor_read::<T>()?.walk(T::Key::default())?.collect::<Result<Vec<_>, DbError>>()
tx.cursor_read::<T>()?
.walk(Some(T::Key::default()))?
.collect::<Result<Vec<_>, DbError>>()
})
}

View File

@ -203,7 +203,7 @@ impl DBTrieLoader {
tx.clear::<tables::StoragesTrie>()?;
let mut accounts_cursor = tx.cursor_read::<tables::HashedAccount>()?;
let mut walker = accounts_cursor.walk(H256::zero())?;
let mut walker = accounts_cursor.walk(None)?;
let db = Arc::new(HashDatabase::new(tx)?);

View File

@ -174,7 +174,7 @@ where
black_box({
let mut cursor = tx.cursor_read::<T>().expect("cursor");
let walker = cursor.walk(input.first().unwrap().0.clone()).unwrap();
let walker = cursor.walk(Some(input.first().unwrap().0.clone())).unwrap();
for element in walker {
element.unwrap();
}
@ -264,9 +264,7 @@ where
black_box({
let mut cursor = tx.cursor_dup_read::<T>().expect("cursor");
let walker = cursor
.walk_dup(input.first().unwrap().0.clone(), T::SubKey::default())
.unwrap();
let walker = cursor.walk_dup(None, Some(T::SubKey::default())).unwrap();
for element in walker {
element.unwrap();
}

View File

@ -30,10 +30,12 @@ pub trait DbCursorRO<'tx, T: Table> {
/// Returns the current `(key, value)` pair of the cursor.
fn current(&mut self) -> PairResult<T>;
/// Returns an iterator starting at a key greater or equal than `start_key`.
/// Returns an iterator that walks through the table. If `start_key`
/// is None, starts from the first entry of the table. If it not, starts at a key
/// greater or equal than the key value wrapped inside Some().
fn walk<'cursor>(
&'cursor mut self,
start_key: T::Key,
start_key: Option<T::Key>,
) -> Result<Walker<'cursor, 'tx, T, Self>, Error>
where
Self: Sized;
@ -76,8 +78,8 @@ pub trait DbDupCursorRO<'tx, T: DupSort> {
/// table.
fn walk_dup<'cursor>(
&'cursor mut self,
key: T::Key,
subkey: T::SubKey,
key: Option<T::Key>,
subkey: Option<T::SubKey>,
) -> Result<DupWalker<'cursor, 'tx, T, Self>, Error>
where
Self: Sized;

View File

@ -132,7 +132,7 @@ impl<'tx, T: Table> DbCursorRO<'tx, T> for CursorMock {
fn walk<'cursor>(
&'cursor mut self,
_start_key: T::Key,
_start_key: Option<T::Key>,
) -> Result<Walker<'cursor, 'tx, T, Self>, Error>
where
Self: Sized,
@ -184,8 +184,8 @@ impl<'tx, T: DupSort> DbDupCursorRO<'tx, T> for CursorMock {
fn walk_dup<'cursor>(
&'cursor mut self,
_key: <T>::Key,
_subkey: <T as DupSort>::SubKey,
_key: Option<<T>::Key>,
_subkey: Option<<T as DupSort>::SubKey>,
) -> Result<DupWalker<'cursor, 'tx, T, Self>, Error>
where
Self: Sized,

View File

@ -11,7 +11,7 @@ use crate::{
tables::utils::*,
Error,
};
use reth_libmdbx::{self, TransactionKind, WriteFlags, RO, RW};
use reth_libmdbx::{self, Error as MDBXError, TransactionKind, WriteFlags, RO, RW};
/// Alias type for a `(key, value)` result coming from a cursor.
pub type PairResult<T> = Result<Option<(<T as Table>::Key, <T as Table>::Value)>, Error>;
@ -75,16 +75,19 @@ impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T>
fn walk<'cursor>(
&'cursor mut self,
start_key: T::Key,
start_key: Option<T::Key>,
) -> Result<Walker<'cursor, 'tx, T, Self>, Error>
where
Self: Sized,
{
let start = self
.inner
.set_range(start_key.encode().as_ref())
.map_err(|e| Error::Read(e.into()))?
.map(decoder::<T>);
let start = if let Some(start_key) = start_key {
self.inner
.set_range(start_key.encode().as_ref())
.map_err(|e| Error::Read(e.into()))?
.map(decoder::<T>)
} else {
self.first().transpose()
};
Ok(Walker::new(self, start))
}
@ -112,17 +115,15 @@ impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T>
where
Self: Sized,
{
if let Some(start_key) = start_key {
let start = self
.inner
let start = if let Some(start_key) = start_key {
self.inner
.set_range(start_key.encode().as_ref())
.map_err(|e| Error::Read(e.into()))?
.map(decoder::<T>);
.map(decoder::<T>)
} else {
self.last().transpose()
};
return Ok(ReverseWalker::new(self, start))
}
let start = self.last().transpose();
Ok(ReverseWalker::new(self, start))
}
}
@ -155,20 +156,50 @@ impl<'tx, K: TransactionKind, T: DupSort> DbDupCursorRO<'tx, T> for Cursor<'tx,
.transpose()
}
/// Returns an iterator starting at a key greater or equal than `start_key` of a DUPSORT table.
/// Depending on its arguments, returns an iterator starting at:
/// - Some(key), Some(subkey): a `key` item whose data is >= than `subkey`
/// - Some(key), None: first item of a specified `key`
/// - None, Some(subkey): like first case, but in the first key
/// - None, None: first item in the table
/// of a DUPSORT table.
fn walk_dup<'cursor>(
&'cursor mut self,
key: T::Key,
subkey: T::SubKey,
key: Option<T::Key>,
subkey: Option<T::SubKey>,
) -> Result<DupWalker<'cursor, 'tx, T, Self>, Error> {
// encode key and decode it after.
let key = key.encode().as_ref().to_vec();
let start = match (key, subkey) {
(Some(key), Some(subkey)) => {
// encode key and decode it after.
let key = key.encode().as_ref().to_vec();
let start = self
.inner
.get_both_range(key.as_ref(), subkey.encode().as_ref())
.map_err(|e| Error::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Owned(key), val)));
self.inner
.get_both_range(key.as_ref(), subkey.encode().as_ref())
.map_err(|e| Error::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Owned(key), val)))
}
(Some(key), None) => {
let key = key.encode().as_ref().to_vec();
self.inner
.set(key.as_ref())
.map_err(|e| Error::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Owned(key), val)))
}
(None, Some(subkey)) => {
if let Some((key, _)) = self.first()? {
let key = key.encode().as_ref().to_vec();
self.inner
.get_both_range(key.as_ref(), subkey.encode().as_ref())
.map_err(|e| Error::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Owned(key), val)))
} else {
let err_code = MDBXError::to_err_code(&MDBXError::NotFound);
Some(Err(Error::Read(err_code)))
}
}
(None, None) => self.first().transpose(),
};
Ok(DupWalker::<'cursor, 'tx, T, Self> { cursor: self, start, _tx_phantom: PhantomData {} })
}

View File

@ -211,7 +211,7 @@ mod tests {
assert!(first.is_some(), "First should be our put");
// Walk
let walk = cursor.walk(key).unwrap();
let walk = cursor.walk(Some(key.into())).unwrap();
let first = walk.into_iter().next().unwrap().unwrap();
assert_eq!(first.1, value, "First next should be put value");
}
@ -363,7 +363,7 @@ mod tests {
// Confirm the result
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(0).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
tx.commit().expect(ERROR_COMMIT);
}
@ -396,7 +396,7 @@ mod tests {
// Confirm the result
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(0).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
tx.commit().expect(ERROR_COMMIT);
}
@ -423,7 +423,7 @@ mod tests {
// Confirm the result
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(0).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
tx.commit().expect(ERROR_COMMIT);
}
@ -451,7 +451,7 @@ mod tests {
// Confirm the result
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(0).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 3, 4, 5]);
tx.commit().expect(ERROR_COMMIT);
}
@ -570,7 +570,7 @@ mod tests {
{
let tx = env.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
let mut walker = cursor.walk_dup(key, H256::from_low_u64_be(1)).unwrap();
let mut walker = cursor.walk_dup(Some(key), Some(H256::from_low_u64_be(1))).unwrap();
assert_eq!(
(key, value11),
walker
@ -608,8 +608,7 @@ mod tests {
{
let tx = env.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
let first = cursor.first().unwrap().unwrap();
let mut walker = cursor.walk_dup(first.0, first.1.key).unwrap();
let mut walker = cursor.walk_dup(None, None).unwrap();
// Notice that value11 and value22 have been ordered in the DB.
assert_eq!(Some(Ok((key1, value00.clone()))), walker.next());
@ -624,7 +623,7 @@ mod tests {
let tx = env.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
let first = cursor.first().unwrap().unwrap();
let mut walker = cursor.walk(first.0).unwrap();
let mut walker = cursor.walk(Some(first.0)).unwrap();
assert_eq!(Some(Ok((key1, value00))), walker.next());
assert_eq!(Some(Ok((key1, value11))), walker.next());
assert_eq!(Some(Ok((key2, value22))), walker.next());
@ -652,7 +651,7 @@ mod tests {
let tx = env.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
let first = cursor.first().unwrap().unwrap();
let mut walker = cursor.walk(first.0).unwrap();
let mut walker = cursor.walk(Some(first.0)).unwrap();
// NOTE: Both values are present
assert_eq!(Some(Ok((key1, value00.clone()))), walker.next());
@ -690,7 +689,7 @@ mod tests {
// It will seek the one greater or equal to the query. Since we have `Address | 100`,
// `Address | 200` in the database and we're querying `Address | 150` it will return us
// `Address | 200`.
let mut walker = cursor.walk(ShardedKey::new(real_key, 150)).unwrap();
let mut walker = cursor.walk(Some(ShardedKey::new(real_key, 150))).unwrap();
let (key, list) = walker
.next()
.expect("element should exist.")