diff --git a/bin/reth/src/test_eth_chain/runner.rs b/bin/reth/src/test_eth_chain/runner.rs index f95ecfd0b..be2f1c64a 100644 --- a/bin/reth/src/test_eth_chain/runner.rs +++ b/bin/reth/src/test_eth_chain/runner.rs @@ -174,7 +174,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result { let storage = db.view(|tx| -> Result<_, DbError> { let mut cursor = tx.cursor_dup_read::()?; - let walker = cursor.first()?.map(|first| cursor.walk(first.0)).transpose()?; + let walker = cursor.first()?.map(|first| cursor.walk(Some(first.0))).transpose()?; Ok(walker.map(|mut walker| { let mut map: HashMap> = HashMap::new(); while let Some(Ok((address, slot))) = walker.next() { @@ -210,7 +210,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result { } Some(RootOrState::State(state)) => db.view(|tx| -> eyre::Result<()> { let mut cursor = tx.cursor_dup_read::()?; - let walker = cursor.first()?.map(|first| cursor.walk(first.0)).transpose()?; + let walker = cursor.first()?.map(|first| cursor.walk(Some(first.0))).transpose()?; let storage = walker.map(|mut walker| { let mut map: HashMap> = HashMap::new(); while let Some(Ok((address, slot))) = walker.next() { diff --git a/crates/stages/src/db.rs b/crates/stages/src/db.rs index 7a8c99ed6..7152b0038 100644 --- a/crates/stages/src/db.rs +++ b/crates/stages/src/db.rs @@ -199,7 +199,7 @@ where T2: Table, { let mut cursor = self.cursor_write::()?; - let mut walker = cursor.walk(start_at)?; + let mut walker = cursor.walk(Some(start_at))?; while let Some((_, value)) = walker.next().transpose()? { self.delete::(value, None)?; } diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 1419a4570..dc230d896 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -619,7 +619,8 @@ mod tests { let mut prev_number: Option = None; let mut expected_transition_id = 0; - for entry in bodies_cursor.walk(first_body_key)? { + + for entry in bodies_cursor.walk(Some(first_body_key))? { let (number, body) = entry?; // Validate sequentiality only after prev progress, diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 717a69952..edab82fb8 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -115,7 +115,7 @@ impl ExecutionStage { tracing::trace!(target: "sync::stages::execution", ?block_number, "Execute block."); // iterate over all transactions - let mut tx_walker = tx_cursor.walk(body.start_tx_id)?; + let mut tx_walker = tx_cursor.walk(Some(body.start_tx_id))?; let mut transactions = Vec::with_capacity(body.tx_count as usize); // get next N transactions. for index in body.tx_id_range() { @@ -129,7 +129,7 @@ impl ExecutionStage { } // take signers - let mut tx_sender_walker = tx_sender.walk(body.start_tx_id)?; + let mut tx_sender_walker = tx_sender.walk(Some(body.start_tx_id))?; let mut signers = Vec::with_capacity(body.tx_count as usize); for index in body.tx_id_range() { let (tx_index, tx) = tx_sender_walker @@ -198,7 +198,7 @@ impl ExecutionStage { if wipe_storage { // iterate over storage and save them before entry is deleted. tx.cursor_read::()? - .walk(address)? + .walk(Some(address))? .take_while(|res| { res.as_ref().map(|(k, _)| *k == address).unwrap_or_default() }) diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 566d14039..3911c0500 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -7,7 +7,7 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; -use reth_primitives::{keccak256, Account, Address, H160}; +use reth_primitives::{keccak256, Account, Address}; use std::{ collections::{BTreeMap, BTreeSet}, fmt::Debug, @@ -63,7 +63,7 @@ impl Stage for AccountHashingStage { tx.clear::()?; tx.commit()?; - let mut first_key = H160::zero(); + let mut first_key = None; loop { let next_key = { let mut accounts = tx.cursor_read::()?; @@ -85,7 +85,7 @@ impl Stage for AccountHashingStage { }; tx.commit()?; if let Some((next_key, _)) = next_key { - first_key = next_key; + first_key = Some(next_key); continue } break diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index a1ec03fc7..76289d2bb 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -8,7 +8,7 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; -use reth_primitives::{keccak256, Address, StorageEntry, H160, H256, U256}; +use reth_primitives::{keccak256, Address, StorageEntry, H256, U256}; use std::{ collections::{BTreeMap, BTreeSet}, fmt::Debug, @@ -63,7 +63,7 @@ impl Stage for StorageHashingStage { tx.clear::()?; tx.commit()?; - let mut first_key = H160::zero(); + let mut first_key = None; loop { let next_key = { let mut storage = tx.cursor_dup_read::()?; @@ -92,7 +92,7 @@ impl Stage for StorageHashingStage { tx.commit()?; first_key = match next_key { - Some(key) => key, + Some(key) => Some(key), None => break, }; } @@ -413,8 +413,7 @@ mod tests { ); expected += 1; } - let count = - tx.cursor_dup_read::()?.walk(H256::zero())?.count(); + let count = tx.cursor_dup_read::()?.walk(None)?.count(); assert_eq!(count, expected); Ok(()) diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 15a3c31a5..b240a95b9 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -59,7 +59,7 @@ impl Stage for IndexAccountHistoryStage { let account_changesets = tx .cursor_read::()? - .walk(from_transition)? + .walk(Some(from_transition))? .take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition).unwrap_or_default()) .collect::, _>>()?; @@ -122,7 +122,7 @@ impl Stage for IndexAccountHistoryStage { let account_changeset = tx .cursor_read::()? - .walk(from_transition_rev)? + .walk(Some(from_transition_rev))? .take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition_rev).unwrap_or_default()) .collect::, _>>()?; diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 704defb76..2afde1a47 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -59,7 +59,7 @@ impl Stage for IndexStorageHistoryStage { let storage_chageset = tx .cursor_read::()? - .walk((from_transition, Address::zero()).into())? + .walk(Some((from_transition, Address::zero()).into()))? .take_while(|res| { res.as_ref().map(|(k, _)| k.transition_id() < to_transition).unwrap_or_default() }) @@ -128,7 +128,7 @@ impl Stage for IndexStorageHistoryStage { let storage_changesets = tx .cursor_read::()? - .walk((from_transition_rev, Address::zero()).into())? + .walk(Some((from_transition_rev, Address::zero()).into()))? .take_while(|res| { res.as_ref().map(|(k, _)| k.transition_id() < to_transition_rev).unwrap_or_default() }) diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index 6e00d52c0..24cd27ed6 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -65,7 +65,7 @@ impl Stage for TotalDifficultyStage { debug!(target: "sync::stages::total_difficulty", ?td, block_number = last_header_number, "Last total difficulty entry"); let walker = cursor_headers - .walk(start_block)? + .walk(Some(start_block))? .take_while(|e| e.as_ref().map(|(_, h)| h.number <= end_block).unwrap_or_default()); // Walk over newly inserted headers, update & insert td for entry in walker { diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index a3808bab4..7263a8a91 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -58,7 +58,7 @@ impl Stage for TransactionLookupStage { let mut tx_cursor = tx.cursor_write::()?; // Walk over block bodies within a specified range. - let bodies = cursor_bodies.walk(start_block)?.take_while(|entry| { + let bodies = cursor_bodies.walk(Some(start_block))?.take_while(|entry| { entry.as_ref().map(|(num, _)| *num <= end_block).unwrap_or_default() }); @@ -66,7 +66,7 @@ impl Stage for TransactionLookupStage { let mut tx_list = vec![]; for body_entry in bodies { let (_, body) = body_entry?; - let transactions = tx_cursor.walk(body.start_tx_id)?.take(body.tx_count as usize); + let transactions = tx_cursor.walk(Some(body.start_tx_id))?.take(body.tx_count as usize); for tx_entry in transactions { let (id, transaction) = tx_entry?; diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 0ce309340..25aac890e 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -85,7 +85,9 @@ impl TestTransaction { T::Key: Default + Ord, { self.query(|tx| { - tx.cursor_read::()?.walk(T::Key::default())?.collect::, DbError>>() + tx.cursor_read::()? + .walk(Some(T::Key::default()))? + .collect::, DbError>>() }) } diff --git a/crates/stages/src/trie/mod.rs b/crates/stages/src/trie/mod.rs index 45542aa20..be1f30501 100644 --- a/crates/stages/src/trie/mod.rs +++ b/crates/stages/src/trie/mod.rs @@ -203,7 +203,7 @@ impl DBTrieLoader { tx.clear::()?; let mut accounts_cursor = tx.cursor_read::()?; - let mut walker = accounts_cursor.walk(H256::zero())?; + let mut walker = accounts_cursor.walk(None)?; let db = Arc::new(HashDatabase::new(tx)?); diff --git a/crates/storage/db/benches/criterion.rs b/crates/storage/db/benches/criterion.rs index d1df4a0d2..0a814cd06 100644 --- a/crates/storage/db/benches/criterion.rs +++ b/crates/storage/db/benches/criterion.rs @@ -174,7 +174,7 @@ where black_box({ let mut cursor = tx.cursor_read::().expect("cursor"); - let walker = cursor.walk(input.first().unwrap().0.clone()).unwrap(); + let walker = cursor.walk(Some(input.first().unwrap().0.clone())).unwrap(); for element in walker { element.unwrap(); } @@ -264,9 +264,7 @@ where black_box({ let mut cursor = tx.cursor_dup_read::().expect("cursor"); - let walker = cursor - .walk_dup(input.first().unwrap().0.clone(), T::SubKey::default()) - .unwrap(); + let walker = cursor.walk_dup(None, Some(T::SubKey::default())).unwrap(); for element in walker { element.unwrap(); } diff --git a/crates/storage/db/src/abstraction/cursor.rs b/crates/storage/db/src/abstraction/cursor.rs index f56458db9..39e38c372 100644 --- a/crates/storage/db/src/abstraction/cursor.rs +++ b/crates/storage/db/src/abstraction/cursor.rs @@ -30,10 +30,12 @@ pub trait DbCursorRO<'tx, T: Table> { /// Returns the current `(key, value)` pair of the cursor. fn current(&mut self) -> PairResult; - /// Returns an iterator starting at a key greater or equal than `start_key`. + /// Returns an iterator that walks through the table. If `start_key` + /// is None, starts from the first entry of the table. If it not, starts at a key + /// greater or equal than the key value wrapped inside Some(). fn walk<'cursor>( &'cursor mut self, - start_key: T::Key, + start_key: Option, ) -> Result, Error> where Self: Sized; @@ -76,8 +78,8 @@ pub trait DbDupCursorRO<'tx, T: DupSort> { /// table. fn walk_dup<'cursor>( &'cursor mut self, - key: T::Key, - subkey: T::SubKey, + key: Option, + subkey: Option, ) -> Result, Error> where Self: Sized; diff --git a/crates/storage/db/src/abstraction/mock.rs b/crates/storage/db/src/abstraction/mock.rs index 23ab62f66..7a68ea870 100644 --- a/crates/storage/db/src/abstraction/mock.rs +++ b/crates/storage/db/src/abstraction/mock.rs @@ -132,7 +132,7 @@ impl<'tx, T: Table> DbCursorRO<'tx, T> for CursorMock { fn walk<'cursor>( &'cursor mut self, - _start_key: T::Key, + _start_key: Option, ) -> Result, Error> where Self: Sized, @@ -184,8 +184,8 @@ impl<'tx, T: DupSort> DbDupCursorRO<'tx, T> for CursorMock { fn walk_dup<'cursor>( &'cursor mut self, - _key: ::Key, - _subkey: ::SubKey, + _key: Option<::Key>, + _subkey: Option<::SubKey>, ) -> Result, Error> where Self: Sized, diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index 95d1aa0f8..5e9d5b1f1 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -11,7 +11,7 @@ use crate::{ tables::utils::*, Error, }; -use reth_libmdbx::{self, TransactionKind, WriteFlags, RO, RW}; +use reth_libmdbx::{self, Error as MDBXError, TransactionKind, WriteFlags, RO, RW}; /// Alias type for a `(key, value)` result coming from a cursor. pub type PairResult = Result::Key, ::Value)>, Error>; @@ -75,16 +75,19 @@ impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T> fn walk<'cursor>( &'cursor mut self, - start_key: T::Key, + start_key: Option, ) -> Result, Error> where Self: Sized, { - let start = self - .inner - .set_range(start_key.encode().as_ref()) - .map_err(|e| Error::Read(e.into()))? - .map(decoder::); + let start = if let Some(start_key) = start_key { + self.inner + .set_range(start_key.encode().as_ref()) + .map_err(|e| Error::Read(e.into()))? + .map(decoder::) + } else { + self.first().transpose() + }; Ok(Walker::new(self, start)) } @@ -112,17 +115,15 @@ impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T> where Self: Sized, { - if let Some(start_key) = start_key { - let start = self - .inner + let start = if let Some(start_key) = start_key { + self.inner .set_range(start_key.encode().as_ref()) .map_err(|e| Error::Read(e.into()))? - .map(decoder::); + .map(decoder::) + } else { + self.last().transpose() + }; - return Ok(ReverseWalker::new(self, start)) - } - - let start = self.last().transpose(); Ok(ReverseWalker::new(self, start)) } } @@ -155,20 +156,50 @@ impl<'tx, K: TransactionKind, T: DupSort> DbDupCursorRO<'tx, T> for Cursor<'tx, .transpose() } - /// Returns an iterator starting at a key greater or equal than `start_key` of a DUPSORT table. + /// Depending on its arguments, returns an iterator starting at: + /// - Some(key), Some(subkey): a `key` item whose data is >= than `subkey` + /// - Some(key), None: first item of a specified `key` + /// - None, Some(subkey): like first case, but in the first key + /// - None, None: first item in the table + /// of a DUPSORT table. fn walk_dup<'cursor>( &'cursor mut self, - key: T::Key, - subkey: T::SubKey, + key: Option, + subkey: Option, ) -> Result, Error> { - // encode key and decode it after. - let key = key.encode().as_ref().to_vec(); + let start = match (key, subkey) { + (Some(key), Some(subkey)) => { + // encode key and decode it after. + let key = key.encode().as_ref().to_vec(); - let start = self - .inner - .get_both_range(key.as_ref(), subkey.encode().as_ref()) - .map_err(|e| Error::Read(e.into()))? - .map(|val| decoder::((Cow::Owned(key), val))); + self.inner + .get_both_range(key.as_ref(), subkey.encode().as_ref()) + .map_err(|e| Error::Read(e.into()))? + .map(|val| decoder::((Cow::Owned(key), val))) + } + (Some(key), None) => { + let key = key.encode().as_ref().to_vec(); + + self.inner + .set(key.as_ref()) + .map_err(|e| Error::Read(e.into()))? + .map(|val| decoder::((Cow::Owned(key), val))) + } + (None, Some(subkey)) => { + if let Some((key, _)) = self.first()? { + let key = key.encode().as_ref().to_vec(); + + self.inner + .get_both_range(key.as_ref(), subkey.encode().as_ref()) + .map_err(|e| Error::Read(e.into()))? + .map(|val| decoder::((Cow::Owned(key), val))) + } else { + let err_code = MDBXError::to_err_code(&MDBXError::NotFound); + Some(Err(Error::Read(err_code))) + } + } + (None, None) => self.first().transpose(), + }; Ok(DupWalker::<'cursor, 'tx, T, Self> { cursor: self, start, _tx_phantom: PhantomData {} }) } diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 597992a94..cc017bc5c 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -211,7 +211,7 @@ mod tests { assert!(first.is_some(), "First should be our put"); // Walk - let walk = cursor.walk(key).unwrap(); + let walk = cursor.walk(Some(key.into())).unwrap(); let first = walk.into_iter().next().unwrap().unwrap(); assert_eq!(first.1, value, "First next should be put value"); } @@ -363,7 +363,7 @@ mod tests { // Confirm the result let tx = db.tx().expect(ERROR_INIT_TX); let mut cursor = tx.cursor_read::().unwrap(); - let res = cursor.walk(0).unwrap().map(|res| res.unwrap().0).collect::>(); + let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::>(); assert_eq!(res, vec![0, 1, 2, 3, 4, 5]); tx.commit().expect(ERROR_COMMIT); } @@ -396,7 +396,7 @@ mod tests { // Confirm the result let tx = db.tx().expect(ERROR_INIT_TX); let mut cursor = tx.cursor_read::().unwrap(); - let res = cursor.walk(0).unwrap().map(|res| res.unwrap().0).collect::>(); + let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::>(); assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); tx.commit().expect(ERROR_COMMIT); } @@ -423,7 +423,7 @@ mod tests { // Confirm the result let tx = db.tx().expect(ERROR_INIT_TX); let mut cursor = tx.cursor_read::().unwrap(); - let res = cursor.walk(0).unwrap().map(|res| res.unwrap().0).collect::>(); + let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::>(); assert_eq!(res, vec![0, 1, 2, 3, 4, 5]); tx.commit().expect(ERROR_COMMIT); } @@ -451,7 +451,7 @@ mod tests { // Confirm the result let tx = db.tx().expect(ERROR_INIT_TX); let mut cursor = tx.cursor_read::().unwrap(); - let res = cursor.walk(0).unwrap().map(|res| res.unwrap().0).collect::>(); + let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::>(); assert_eq!(res, vec![0, 1, 3, 4, 5]); tx.commit().expect(ERROR_COMMIT); } @@ -570,7 +570,7 @@ mod tests { { let tx = env.tx().expect(ERROR_INIT_TX); let mut cursor = tx.cursor_dup_read::().unwrap(); - let mut walker = cursor.walk_dup(key, H256::from_low_u64_be(1)).unwrap(); + let mut walker = cursor.walk_dup(Some(key), Some(H256::from_low_u64_be(1))).unwrap(); assert_eq!( (key, value11), walker @@ -608,8 +608,7 @@ mod tests { { let tx = env.tx().expect(ERROR_INIT_TX); let mut cursor = tx.cursor_dup_read::().unwrap(); - let first = cursor.first().unwrap().unwrap(); - let mut walker = cursor.walk_dup(first.0, first.1.key).unwrap(); + let mut walker = cursor.walk_dup(None, None).unwrap(); // Notice that value11 and value22 have been ordered in the DB. assert_eq!(Some(Ok((key1, value00.clone()))), walker.next()); @@ -624,7 +623,7 @@ mod tests { let tx = env.tx().expect(ERROR_INIT_TX); let mut cursor = tx.cursor_dup_read::().unwrap(); let first = cursor.first().unwrap().unwrap(); - let mut walker = cursor.walk(first.0).unwrap(); + let mut walker = cursor.walk(Some(first.0)).unwrap(); assert_eq!(Some(Ok((key1, value00))), walker.next()); assert_eq!(Some(Ok((key1, value11))), walker.next()); assert_eq!(Some(Ok((key2, value22))), walker.next()); @@ -652,7 +651,7 @@ mod tests { let tx = env.tx().expect(ERROR_INIT_TX); let mut cursor = tx.cursor_dup_read::().unwrap(); let first = cursor.first().unwrap().unwrap(); - let mut walker = cursor.walk(first.0).unwrap(); + let mut walker = cursor.walk(Some(first.0)).unwrap(); // NOTE: Both values are present assert_eq!(Some(Ok((key1, value00.clone()))), walker.next()); @@ -690,7 +689,7 @@ mod tests { // It will seek the one greater or equal to the query. Since we have `Address | 100`, // `Address | 200` in the database and we're querying `Address | 150` it will return us // `Address | 200`. - let mut walker = cursor.walk(ShardedKey::new(real_key, 150)).unwrap(); + let mut walker = cursor.walk(Some(ShardedKey::new(real_key, 150))).unwrap(); let (key, list) = walker .next() .expect("element should exist.")