feat: remove get or take usage (#9412)

This commit is contained in:
Dan Cline
2024-07-10 12:10:41 -04:00
committed by GitHub
parent d0f78bdb39
commit a29d8bdbdf
4 changed files with 609 additions and 189 deletions

View File

@ -585,13 +585,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
Ok(self.tx.commit()?)
}
// TODO(joshie) TEMPORARY should be moved to trait providers
/// Unwind or peek at last N blocks of state recreating the [`ExecutionOutcome`].
///
/// If UNWIND it set to true tip and latest state will be unwind
/// and returned back with all the blocks
///
/// If UNWIND is false we will just read the state/blocks and return them.
/// Return the last N blocks of state, recreating the [`ExecutionOutcome`].
///
/// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
/// transaction ids.
@ -610,7 +604,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
/// 1. Take the old value from the changeset
/// 2. Take the new value from the local state
/// 3. Set the local state to the value in the changeset
pub fn unwind_or_peek_state<const TAKE: bool>(
pub fn get_state(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<ExecutionOutcome> {
@ -620,7 +614,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
let start_block_number = *range.start();
// We are not removing block meta as it is used to get block changesets.
let block_bodies = self.get_or_take::<tables::BlockBodyIndices, false>(range.clone())?;
let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
// get transaction receipts
let from_transaction_num =
@ -630,9 +624,8 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
let storage_range = BlockNumberAddress::range(range.clone());
let storage_changeset =
self.get_or_take::<tables::StorageChangeSets, TAKE>(storage_range)?;
let account_changeset = self.get_or_take::<tables::AccountChangeSets, TAKE>(range)?;
let storage_changeset = self.get::<tables::StorageChangeSets>(storage_range)?;
let account_changeset = self.get::<tables::AccountChangeSets>(range)?;
// iterate previous value and get plain state value to create changeset
// Double option around Account represent if Account state is know (first option) and
@ -701,45 +694,178 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
.push(old_storage);
}
if TAKE {
// iterate over local plain state remove all account and all storages.
for (address, (old_account, new_account, storage)) in &state {
// revert account if needed.
if old_account != new_account {
let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
if let Some(account) = old_account {
plain_accounts_cursor.upsert(*address, *account)?;
} else if existing_entry.is_some() {
plain_accounts_cursor.delete_current()?;
}
// iterate over block body and create ExecutionResult
let mut receipt_iter =
self.get::<tables::Receipts>(from_transaction_num..=to_transaction_num)?.into_iter();
let mut receipts = Vec::new();
// loop break if we are at the end of the blocks.
for (_, block_body) in block_bodies {
let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
for _ in block_body.tx_num_range() {
if let Some((_, receipt)) = receipt_iter.next() {
block_receipts.push(Some(receipt));
}
}
receipts.push(block_receipts);
}
Ok(ExecutionOutcome::new_init(
state,
reverts,
Vec::new(),
receipts.into(),
start_block_number,
Vec::new(),
))
}
/// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
///
/// The latest state will be unwound and returned back with all the blocks
///
/// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
/// transaction ids.
/// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
/// [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
/// the changesets.
/// - In order to have both the old and new values in the changesets, we also access the
/// plain state tables.
/// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
/// we:
/// 1. Take the old value from the changeset
/// 2. Take the new value from the plain state
/// 3. Save the old value to the local state
/// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
/// have seen before we:
/// 1. Take the old value from the changeset
/// 2. Take the new value from the local state
/// 3. Set the local state to the value in the changeset
pub fn take_state(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<ExecutionOutcome> {
if range.is_empty() {
return Ok(ExecutionOutcome::default())
}
let start_block_number = *range.start();
// We are not removing block meta as it is used to get block changesets.
let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
// get transaction receipts
let from_transaction_num =
block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
let to_transaction_num =
block_bodies.last().expect("already checked if there are blocks").1.last_tx_num();
let storage_range = BlockNumberAddress::range(range.clone());
let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
// iterate previous value and get plain state value to create changeset
// Double option around Account represent if Account state is know (first option) and
// account is removed (Second Option)
let mut state: BundleStateInit = HashMap::new();
// This is not working for blocks that are not at tip. as plain state is not the last
// state of end range. We should rename the functions or add support to access
// History state. Accessing history state can be tricky but we are not gaining
// anything.
let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
let mut reverts: RevertsInit = HashMap::new();
// add account changeset changes
for (block_number, account_before) in account_changeset.into_iter().rev() {
let AccountBeforeTx { info: old_info, address } = account_before;
match state.entry(address) {
hash_map::Entry::Vacant(entry) => {
let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
entry.insert((old_info, new_info, HashMap::new()));
}
hash_map::Entry::Occupied(mut entry) => {
// overwrite old account state.
entry.get_mut().0 = old_info;
}
}
// insert old info into reverts.
reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
}
// add storage changeset changes
for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
let BlockNumberAddress((block_number, address)) = block_and_address;
// get account state or insert from plain state.
let account_state = match state.entry(address) {
hash_map::Entry::Vacant(entry) => {
let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
entry.insert((present_info, present_info, HashMap::new()))
}
hash_map::Entry::Occupied(entry) => entry.into_mut(),
};
// match storage.
match account_state.2.entry(old_storage.key) {
hash_map::Entry::Vacant(entry) => {
let new_storage = plain_storage_cursor
.seek_by_key_subkey(address, old_storage.key)?
.filter(|storage| storage.key == old_storage.key)
.unwrap_or_default();
entry.insert((old_storage.value, new_storage.value));
}
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().0 = old_storage.value;
}
};
reverts
.entry(block_number)
.or_default()
.entry(address)
.or_default()
.1
.push(old_storage);
}
// iterate over local plain state remove all account and all storages.
for (address, (old_account, new_account, storage)) in &state {
// revert account if needed.
if old_account != new_account {
let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
if let Some(account) = old_account {
plain_accounts_cursor.upsert(*address, *account)?;
} else if existing_entry.is_some() {
plain_accounts_cursor.delete_current()?;
}
}
// revert storages
for (storage_key, (old_storage_value, _new_storage_value)) in storage {
let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
// delete previous value
// TODO: This does not use dupsort features
if plain_storage_cursor
.seek_by_key_subkey(*address, *storage_key)?
.filter(|s| s.key == *storage_key)
.is_some()
{
plain_storage_cursor.delete_current()?
}
// revert storages
for (storage_key, (old_storage_value, _new_storage_value)) in storage {
let storage_entry =
StorageEntry { key: *storage_key, value: *old_storage_value };
// delete previous value
// TODO: This does not use dupsort features
if plain_storage_cursor
.seek_by_key_subkey(*address, *storage_key)?
.filter(|s| s.key == *storage_key)
.is_some()
{
plain_storage_cursor.delete_current()?
}
// insert value if needed
if *old_storage_value != U256::ZERO {
plain_storage_cursor.upsert(*address, storage_entry)?;
}
// insert value if needed
if *old_storage_value != U256::ZERO {
plain_storage_cursor.upsert(*address, storage_entry)?;
}
}
}
// iterate over block body and create ExecutionResult
let mut receipt_iter = self
.get_or_take::<tables::Receipts, TAKE>(from_transaction_num..=to_transaction_num)?
.into_iter();
let mut receipt_iter =
self.take::<tables::Receipts>(from_transaction_num..=to_transaction_num)?.into_iter();
let mut receipts = Vec::new();
// loop break if we are at the end of the blocks.
@ -779,22 +905,6 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
Ok(entries)
}
/// Return a list of entries from the table, based on the given range.
///
/// If TAKE is true, opened cursor will delete and return the entries for the given range.
/// Otherwise, they will just be returned.
#[inline]
pub fn get_or_take<T: Table, const TAKE: bool>(
&self,
range: impl RangeBounds<T::Key>,
) -> Result<Vec<KeyValue<T>>, DatabaseError> {
if TAKE {
self.take::<T>(range)
} else {
self.get::<T>(range)
}
}
/// Return a list of entries from the table, based on the given range.
#[inline]
pub fn get<T: Table>(
@ -820,13 +930,13 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
Ok(items)
}
/// Get requested blocks transaction with signer
pub(crate) fn get_take_block_transaction_range<const TAKE: bool>(
/// Get requested blocks transaction with senders
pub(crate) fn get_block_transaction_range(
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> ProviderResult<Vec<(BlockNumber, Vec<TransactionSignedEcRecovered>)>> {
// Raad range of block bodies to get all transactions id's of this range.
let block_bodies = self.get_or_take::<tables::BlockBodyIndices, false>(range)?;
let block_bodies = self.get::<tables::BlockBodyIndices>(range)?;
if block_bodies.is_empty() {
return Ok(Vec::new())
@ -843,14 +953,13 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
// Get transactions and senders
let transactions = self
.get_or_take::<tables::Transactions, TAKE>(first_transaction..=last_transaction)?
.get::<tables::Transactions>(first_transaction..=last_transaction)?
.into_iter()
.map(|(id, tx)| (id, tx.into()))
.collect::<Vec<(u64, TransactionSigned)>>();
let mut senders = self.get_or_take::<tables::TransactionSenders, TAKE>(
first_transaction..=last_transaction,
)?;
let mut senders =
self.get::<tables::TransactionSenders>(first_transaction..=last_transaction)?;
// Recover senders manually if not found in db
// NOTE: Transactions are always guaranteed to be in the database whereas
@ -917,22 +1026,207 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
);
}
if TAKE {
// Remove TransactionHashNumbers
let mut tx_hash_cursor = self.tx.cursor_write::<tables::TransactionHashNumbers>()?;
for (_, tx) in &transactions {
if tx_hash_cursor.seek_exact(tx.hash())?.is_some() {
tx_hash_cursor.delete_current()?;
// Merge transaction into blocks
let mut block_tx = Vec::with_capacity(block_bodies.len());
let mut senders = senders.into_iter();
let mut transactions = transactions.into_iter();
for (block_number, block_body) in block_bodies {
let mut one_block_tx = Vec::with_capacity(block_body.tx_count as usize);
for _ in block_body.tx_num_range() {
let tx = transactions.next();
let sender = senders.next();
let recovered = match (tx, sender) {
(Some((tx_id, tx)), Some((sender_tx_id, sender))) => {
if tx_id != sender_tx_id {
Err(ProviderError::MismatchOfTransactionAndSenderId { tx_id })
} else {
Ok(TransactionSignedEcRecovered::from_signed_transaction(tx, sender))
}
}
(Some((tx_id, _)), _) | (_, Some((tx_id, _))) => {
Err(ProviderError::MismatchOfTransactionAndSenderId { tx_id })
}
(None, None) => Err(ProviderError::BlockBodyTransactionCount),
}?;
one_block_tx.push(recovered)
}
block_tx.push((block_number, one_block_tx));
}
Ok(block_tx)
}
/// Remove requested block transactions, without returning them.
///
/// This will remove block data for the given range from the following tables:
/// * [`BlockBodyIndices`](tables::BlockBodyIndices)
/// * [`Transactions`](tables::Transactions)
/// * [`TransactionSenders`](tables::TransactionSenders)
/// * [`TransactionHashNumbers`](tables::TransactionHashNumbers)
/// * [`TransactionBlocks`](tables::TransactionBlocks)
pub fn remove_block_transaction_range(
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> ProviderResult<()> {
// Raad range of block bodies to get all transactions id's of this range.
let block_bodies = self.take::<tables::BlockBodyIndices>(range)?;
if block_bodies.is_empty() {
return Ok(())
}
// Compute the first and last tx ID in the range
let first_transaction = block_bodies.first().expect("If we have headers").1.first_tx_num();
let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_num();
// If this is the case then all of the blocks in the range are empty
if last_transaction < first_transaction {
return Ok(())
}
// Get transactions so we can then remove
let transactions = self
.take::<tables::Transactions>(first_transaction..=last_transaction)?
.into_iter()
.map(|(id, tx)| (id, tx.into()))
.collect::<Vec<(u64, TransactionSigned)>>();
// remove senders
self.remove::<tables::TransactionSenders>(first_transaction..=last_transaction)?;
// Remove TransactionHashNumbers
let mut tx_hash_cursor = self.tx.cursor_write::<tables::TransactionHashNumbers>()?;
for (_, tx) in &transactions {
if tx_hash_cursor.seek_exact(tx.hash())?.is_some() {
tx_hash_cursor.delete_current()?;
}
}
// Remove TransactionBlocks index if there are transaction present
if !transactions.is_empty() {
let tx_id_range = transactions.first().unwrap().0..=transactions.last().unwrap().0;
self.remove::<tables::TransactionBlocks>(tx_id_range)?;
}
Ok(())
}
/// Get requested blocks transaction with senders, also removing them from the database
///
/// This will remove block data for the given range from the following tables:
/// * [`BlockBodyIndices`](tables::BlockBodyIndices)
/// * [`Transactions`](tables::Transactions)
/// * [`TransactionSenders`](tables::TransactionSenders)
/// * [`TransactionHashNumbers`](tables::TransactionHashNumbers)
/// * [`TransactionBlocks`](tables::TransactionBlocks)
pub fn take_block_transaction_range(
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> ProviderResult<Vec<(BlockNumber, Vec<TransactionSignedEcRecovered>)>> {
// Raad range of block bodies to get all transactions id's of this range.
let block_bodies = self.get::<tables::BlockBodyIndices>(range)?;
if block_bodies.is_empty() {
return Ok(Vec::new())
}
// Compute the first and last tx ID in the range
let first_transaction = block_bodies.first().expect("If we have headers").1.first_tx_num();
let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_num();
// If this is the case then all of the blocks in the range are empty
if last_transaction < first_transaction {
return Ok(block_bodies.into_iter().map(|(n, _)| (n, Vec::new())).collect())
}
// Get transactions and senders
let transactions = self
.take::<tables::Transactions>(first_transaction..=last_transaction)?
.into_iter()
.map(|(id, tx)| (id, tx.into()))
.collect::<Vec<(u64, TransactionSigned)>>();
let mut senders =
self.take::<tables::TransactionSenders>(first_transaction..=last_transaction)?;
// Recover senders manually if not found in db
// NOTE: Transactions are always guaranteed to be in the database whereas
// senders might be pruned.
if senders.len() != transactions.len() {
if senders.len() > transactions.len() {
error!(target: "providers::db", senders=%senders.len(), transactions=%transactions.len(),
first_tx=%first_transaction, last_tx=%last_transaction,
"unexpected senders and transactions mismatch");
}
let missing = transactions.len().saturating_sub(senders.len());
senders.reserve(missing);
// Find all missing senders, their corresponding tx numbers and indexes to the original
// `senders` vector at which the recovered senders will be inserted.
let mut missing_senders = Vec::with_capacity(missing);
{
let mut senders = senders.iter().peekable();
// `transactions` contain all entries. `senders` contain _some_ of the senders for
// these transactions. Both are sorted and indexed by `TxNumber`.
//
// The general idea is to iterate on both `transactions` and `senders`, and advance
// the `senders` iteration only if it matches the current `transactions` entry's
// `TxNumber`. Otherwise, add the transaction to the list of missing senders.
for (i, (tx_number, transaction)) in transactions.iter().enumerate() {
if let Some((sender_tx_number, _)) = senders.peek() {
if sender_tx_number == tx_number {
// If current sender's `TxNumber` matches current transaction's
// `TxNumber`, advance the senders iterator.
senders.next();
} else {
// If current sender's `TxNumber` doesn't match current transaction's
// `TxNumber`, add it to missing senders.
missing_senders.push((i, tx_number, transaction));
}
} else {
// If there's no more senders left, but we're still iterating over
// transactions, add them to missing senders
missing_senders.push((i, tx_number, transaction));
}
}
}
// Remove TransactionBlocks index if there are transaction present
if !transactions.is_empty() {
let tx_id_range = transactions.first().unwrap().0..=transactions.last().unwrap().0;
// NOTE: we are in this branch because `TAKE` is true, so we can use the `remove`
// method
self.remove::<tables::TransactionBlocks>(tx_id_range)?;
// Recover senders
let recovered_senders = TransactionSigned::recover_signers(
missing_senders.iter().map(|(_, _, tx)| *tx).collect::<Vec<_>>(),
missing_senders.len(),
)
.ok_or(ProviderError::SenderRecoveryError)?;
// Insert recovered senders along with tx numbers at the corresponding indexes to the
// original `senders` vector
for ((i, tx_number, _), sender) in missing_senders.into_iter().zip(recovered_senders) {
// Insert will put recovered senders at necessary positions and shift the rest
senders.insert(i, (*tx_number, sender));
}
// Debug assertions which are triggered during the test to ensure that all senders are
// present and sorted
debug_assert_eq!(senders.len(), transactions.len(), "missing one or more senders");
debug_assert!(
senders.iter().tuple_windows().all(|(a, b)| a.0 < b.0),
"senders not sorted"
);
}
// Remove TransactionHashNumbers
let mut tx_hash_cursor = self.tx.cursor_write::<tables::TransactionHashNumbers>()?;
for (_, tx) in &transactions {
if tx_hash_cursor.seek_exact(tx.hash())?.is_some() {
tx_hash_cursor.delete_current()?;
}
}
// Remove TransactionBlocks index if there are transaction present
if !transactions.is_empty() {
let tx_id_range = transactions.first().unwrap().0..=transactions.last().unwrap().0;
self.remove::<tables::TransactionBlocks>(tx_id_range)?;
}
// Merge transaction into blocks
@ -966,8 +1260,42 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
Ok(block_tx)
}
/// Get or unwind the given range of blocks.
pub fn get_take_block_range<const TAKE: bool>(
/// Remove the given range of blocks, without returning any of the blocks.
///
/// This will remove block data for the given range from the following tables:
/// * [`HeaderNumbers`](tables::HeaderNumbers)
/// * [`CanonicalHeaders`](tables::CanonicalHeaders)
/// * [`BlockOmmers`](tables::BlockOmmers)
/// * [`BlockWithdrawals`](tables::BlockWithdrawals)
/// * [`BlockRequests`](tables::BlockRequests)
/// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties)
///
/// This will also remove transaction data according to
/// [`remove_block_transaction_range`](Self::remove_block_transaction_range).
pub fn remove_block_range(
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> ProviderResult<()> {
let block_headers = self.remove::<tables::Headers>(range.clone())?;
if block_headers == 0 {
return Ok(())
}
self.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
range.clone(),
)?;
self.remove::<tables::CanonicalHeaders>(range.clone())?;
self.remove::<tables::BlockOmmers>(range.clone())?;
self.remove::<tables::BlockWithdrawals>(range.clone())?;
self.remove::<tables::BlockRequests>(range.clone())?;
self.remove_block_transaction_range(range.clone())?;
self.remove::<tables::HeaderTerminalDifficulties>(range)?;
Ok(())
}
/// Get the given range of blocks.
pub fn get_block_range(
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> ProviderResult<Vec<SealedBlockWithSenders>> {
@ -980,33 +1308,128 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
// - Requests
// - Signers
let block_headers = self.get_or_take::<tables::Headers, TAKE>(range.clone())?;
let block_headers = self.get::<tables::Headers>(range.clone())?;
if block_headers.is_empty() {
return Ok(Vec::new())
}
let block_header_hashes =
self.get_or_take::<tables::CanonicalHeaders, TAKE>(range.clone())?;
let block_ommers = self.get_or_take::<tables::BlockOmmers, TAKE>(range.clone())?;
let block_withdrawals =
self.get_or_take::<tables::BlockWithdrawals, TAKE>(range.clone())?;
let block_requests = self.get_or_take::<tables::BlockRequests, TAKE>(range.clone())?;
let block_header_hashes = self.get::<tables::CanonicalHeaders>(range.clone())?;
let block_ommers = self.get::<tables::BlockOmmers>(range.clone())?;
let block_withdrawals = self.get::<tables::BlockWithdrawals>(range.clone())?;
let block_requests = self.get::<tables::BlockRequests>(range.clone())?;
let block_tx = self.get_take_block_transaction_range::<TAKE>(range.clone())?;
let block_tx = self.get_block_transaction_range(range)?;
if TAKE {
// rm HeaderTerminalDifficulties
// NOTE: we are in this branch because `TAKE` is true, so we can use the `remove` method
self.remove::<tables::HeaderTerminalDifficulties>(range)?;
// rm HeaderNumbers
let mut header_number_cursor = self.tx.cursor_write::<tables::HeaderNumbers>()?;
for (_, hash) in &block_header_hashes {
if header_number_cursor.seek_exact(*hash)?.is_some() {
header_number_cursor.delete_current()?;
// merge all into block
let block_header_iter = block_headers.into_iter();
let block_header_hashes_iter = block_header_hashes.into_iter();
let block_tx_iter = block_tx.into_iter();
// Ommers can be empty for some blocks
let mut block_ommers_iter = block_ommers.into_iter();
let mut block_withdrawals_iter = block_withdrawals.into_iter();
let mut block_requests_iter = block_requests.into_iter();
let mut block_ommers = block_ommers_iter.next();
let mut block_withdrawals = block_withdrawals_iter.next();
let mut block_requests = block_requests_iter.next();
let mut blocks = Vec::new();
for ((main_block_number, header), (_, header_hash), (_, tx)) in
izip!(block_header_iter.into_iter(), block_header_hashes_iter, block_tx_iter)
{
let header = header.seal(header_hash);
let (body, senders) = tx.into_iter().map(|tx| tx.to_components()).unzip();
// Ommers can be missing
let mut ommers = Vec::new();
if let Some((block_number, _)) = block_ommers.as_ref() {
if *block_number == main_block_number {
ommers = block_ommers.take().unwrap().1.ommers;
block_ommers = block_ommers_iter.next();
}
};
// withdrawal can be missing
let shanghai_is_active =
self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp);
let mut withdrawals = Some(Withdrawals::default());
if shanghai_is_active {
if let Some((block_number, _)) = block_withdrawals.as_ref() {
if *block_number == main_block_number {
withdrawals = Some(block_withdrawals.take().unwrap().1.withdrawals);
block_withdrawals = block_withdrawals_iter.next();
}
}
} else {
withdrawals = None
}
// requests can be missing
let prague_is_active = self.chain_spec.is_prague_active_at_timestamp(header.timestamp);
let mut requests = Some(Requests::default());
if prague_is_active {
if let Some((block_number, _)) = block_requests.as_ref() {
if *block_number == main_block_number {
requests = Some(block_requests.take().unwrap().1);
block_requests = block_requests_iter.next();
}
}
} else {
requests = None;
}
blocks.push(SealedBlockWithSenders {
block: SealedBlock { header, body, ommers, withdrawals, requests },
senders,
})
}
Ok(blocks)
}
/// Remove the given range of blocks, and return them.
///
/// This will remove block data for the given range from the following tables:
/// * [`HeaderNumbers`](tables::HeaderNumbers)
/// * [`CanonicalHeaders`](tables::CanonicalHeaders)
/// * [`BlockOmmers`](tables::BlockOmmers)
/// * [`BlockWithdrawals`](tables::BlockWithdrawals)
/// * [`BlockRequests`](tables::BlockRequests)
/// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties)
///
/// This will also remove transaction data according to
/// [`take_block_transaction_range`](Self::take_block_transaction_range).
pub fn take_block_range(
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> ProviderResult<Vec<SealedBlockWithSenders>> {
// For blocks we need:
//
// - Headers
// - Bodies (transactions)
// - Uncles/ommers
// - Withdrawals
// - Requests
// - Signers
let block_headers = self.take::<tables::Headers>(range.clone())?;
if block_headers.is_empty() {
return Ok(Vec::new())
}
self.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
range.clone(),
)?;
let block_header_hashes = self.take::<tables::CanonicalHeaders>(range.clone())?;
let block_ommers = self.take::<tables::BlockOmmers>(range.clone())?;
let block_withdrawals = self.take::<tables::BlockWithdrawals>(range.clone())?;
let block_requests = self.take::<tables::BlockRequests>(range.clone())?;
let block_tx = self.take_block_transaction_range(range.clone())?;
// rm HeaderTerminalDifficulties
self.remove::<tables::HeaderTerminalDifficulties>(range)?;
// merge all into block
let block_header_iter = block_headers.into_iter();
let block_header_hashes_iter = block_header_hashes.into_iter();
@ -2568,95 +2991,102 @@ impl<TX: DbTxMut + DbTx> HistoryWriter for DatabaseProvider<TX> {
}
impl<TX: DbTxMut + DbTx> BlockExecutionWriter for DatabaseProvider<TX> {
/// Return range of blocks and its execution result
fn get_or_take_block_and_execution_range<const TAKE: bool>(
fn get_block_and_execution_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Chain> {
if TAKE {
let storage_range = BlockNumberAddress::range(range.clone());
// get blocks
let blocks = self.get_block_range(range.clone())?;
// Unwind account hashes. Add changed accounts to account prefix set.
let hashed_addresses = self.unwind_account_hashing(range.clone())?;
let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
let mut destroyed_accounts = HashSet::default();
for (hashed_address, account) in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
if account.is_none() {
destroyed_accounts.insert(hashed_address);
}
// get execution res
let execution_state = self.get_state(range)?;
Ok(Chain::new(blocks, execution_state, None))
}
fn take_block_and_execution_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Chain> {
let storage_range = BlockNumberAddress::range(range.clone());
// Unwind account hashes. Add changed accounts to account prefix set.
let hashed_addresses = self.unwind_account_hashing(range.clone())?;
let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
let mut destroyed_accounts = HashSet::default();
for (hashed_address, account) in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
if account.is_none() {
destroyed_accounts.insert(hashed_address);
}
// Unwind account history indices.
self.unwind_account_history_indices(range.clone())?;
// Unwind storage hashes. Add changed account and storage keys to corresponding prefix
// sets.
let mut storage_prefix_sets = HashMap::<B256, PrefixSet>::default();
let storage_entries = self.unwind_storage_hashing(storage_range.clone())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
for slot in hashed_slots {
storage_prefix_set.insert(Nibbles::unpack(slot));
}
storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
}
// Unwind storage history indices.
self.unwind_storage_history_indices(storage_range)?;
// Calculate the reverted merkle root.
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
// are pre-loaded.
let prefix_sets = TriePrefixSets {
account_prefix_set: account_prefix_set.freeze(),
storage_prefix_sets,
destroyed_accounts,
};
let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
.with_prefix_sets(prefix_sets)
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
let parent_number = range.start().saturating_sub(1);
let parent_state_root = self
.header_by_number(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
.state_root;
// state root should be always correct as we are reverting state.
// but for sake of double verification we will check it again.
if new_state_root != parent_state_root {
let parent_hash = self
.block_hash(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: new_state_root, expected: parent_state_root },
block_number: parent_number,
block_hash: parent_hash,
})))
}
trie_updates.write_to_database(&self.tx)?;
}
// Unwind account history indices.
self.unwind_account_history_indices(range.clone())?;
// Unwind storage hashes. Add changed account and storage keys to corresponding prefix
// sets.
let mut storage_prefix_sets = HashMap::<B256, PrefixSet>::default();
let storage_entries = self.unwind_storage_hashing(storage_range.clone())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
for slot in hashed_slots {
storage_prefix_set.insert(Nibbles::unpack(slot));
}
storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
}
// Unwind storage history indices.
self.unwind_storage_history_indices(storage_range)?;
// Calculate the reverted merkle root.
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
// are pre-loaded.
let prefix_sets = TriePrefixSets {
account_prefix_set: account_prefix_set.freeze(),
storage_prefix_sets,
destroyed_accounts,
};
let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
.with_prefix_sets(prefix_sets)
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
let parent_number = range.start().saturating_sub(1);
let parent_state_root = self
.header_by_number(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
.state_root;
// state root should be always correct as we are reverting state.
// but for sake of double verification we will check it again.
if new_state_root != parent_state_root {
let parent_hash = self
.block_hash(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: new_state_root, expected: parent_state_root },
block_number: parent_number,
block_hash: parent_hash,
})))
}
trie_updates.write_to_database(&self.tx)?;
// get blocks
let blocks = self.get_take_block_range::<TAKE>(range.clone())?;
let blocks = self.take_block_range(range.clone())?;
let unwind_to = blocks.first().map(|b| b.number.saturating_sub(1));
// get execution res
let execution_state = self.unwind_or_peek_state::<TAKE>(range.clone())?;
let execution_state = self.take_state(range.clone())?;
// remove block bodies it is needed for both get block range and get block execution results
// that is why it is deleted afterwards.
if TAKE {
// rm block bodies
// NOTE: we are in this branch because `TAKE` is true, so we can use the `remove` method
self.remove::<tables::BlockBodyIndices>(range)?;
self.remove::<tables::BlockBodyIndices>(range)?;
// Update pipeline progress
if let Some(fork_number) = unwind_to {
self.update_pipeline_stages(fork_number, true)?;
}
// Update pipeline progress
if let Some(fork_number) = unwind_to {
self.update_pipeline_stages(fork_number, true)?;
}
Ok(Chain::new(blocks, execution_state, None))