bug(stages): TxLookup/Sender stages fix range if there is no tx (#3479)

This commit is contained in:
rakita
2023-07-15 21:36:27 +02:00
committed by GitHub
parent 99a8e0f982
commit d001313f99
4 changed files with 58 additions and 28 deletions

View File

@ -155,6 +155,14 @@ impl Command {
)?;
insert_genesis_header::<DatabaseEnv>(tx, self.chain)?;
}
StageEnum::TxLookup => {
tx.clear::<tables::TxHashNumber>()?;
tx.put::<tables::SyncStage>(
StageId::TransactionLookup.to_string(),
Default::default(),
)?;
insert_genesis_header::<DatabaseEnv>(tx, self.chain)?;
}
_ => {
info!("Nothing to do for stage {:?}", self.stage);
return Ok(())

View File

@ -1,14 +1,14 @@
use crate::error::StageError;
use async_trait::async_trait;
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
use reth_db::database::Database;
use reth_primitives::{
stage::{StageCheckpoint, StageId},
BlockNumber, TxNumber,
};
use reth_provider::{BlockReader, DatabaseProviderRW, ProviderError};
use reth_provider::{BlockReader, DatabaseProviderRW, ProviderError, TransactionsProvider};
use std::{
cmp::{max, min},
ops::RangeInclusive,
ops::{Range, RangeInclusive},
};
/// Stage execution input, see [Stage::execute].
@ -77,30 +77,46 @@ impl ExecInput {
&self,
provider: &DatabaseProviderRW<'_, DB>,
tx_threshold: u64,
) -> Result<(RangeInclusive<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError> {
) -> Result<(Range<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError> {
let start_block = self.next_block();
let target_block = self.target();
let start_block_body = provider
.block_body_indices(start_block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?;
let first_tx_num = start_block_body.first_tx_num();
let target_block = self.target();
let target_block_body = provider
.block_body_indices(target_block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(target_block))?;
let first_tx_number = start_block_body.first_tx_num();
let mut last_tx_number = start_block_body.last_tx_num();
let mut end_block_number = start_block;
let mut body_indices_cursor =
provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
for entry in body_indices_cursor.walk_range(start_block..=target_block)? {
let (block, body) = entry?;
last_tx_number = body.last_tx_num();
end_block_number = block;
let tx_count = (first_tx_number..=last_tx_number).count() as u64;
if tx_count > tx_threshold {
break
}
// number of transactions left to execute.
let all_tx_cnt = target_block_body.next_tx_num() - first_tx_num;
if all_tx_cnt == 0 {
// if there is no more transaction return back.
return Ok((first_tx_num..first_tx_num, start_block..=target_block, true))
}
let is_final_range = end_block_number >= target_block;
Ok((first_tx_number..=last_tx_number, start_block..=end_block_number, is_final_range))
// get block of this tx
let (end_block, is_final_range, next_tx_num) = if all_tx_cnt <= tx_threshold {
(target_block, true, target_block_body.next_tx_num())
} else {
// get tx block number. next_tx_num in this case will be less thean all_tx_cnt.
// So we are sure that transaction must exist.
let end_block_number = provider
.transaction_block(first_tx_num + tx_threshold)?
.expect("block of tx must exist");
// we want to get range of all transactions of this block, so we are fetching block
// body.
let end_block_body = provider
.block_body_indices(end_block_number)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(target_block))?;
(end_block_number, false, end_block_body.next_tx_num())
};
let tx_range = first_tx_num..next_tx_num;
Ok((tx_range, start_block..=end_block, is_final_range))
}
}

View File

@ -85,7 +85,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// Acquire the cursor over the transactions
let mut tx_cursor = tx.cursor_read::<RawTable<tables::Transactions>>()?;
// Walk the transactions from start to end index (inclusive)
let raw_tx_range = RawKey::new(*tx_range.start())..=RawKey::new(*tx_range.end());
let raw_tx_range = RawKey::new(tx_range.start)..RawKey::new(tx_range.end);
let tx_walker = tx_cursor.walk_range(raw_tx_range)?;
// Iterate over transactions in chunks

View File

@ -246,13 +246,19 @@ impl TestTransaction {
blocks.into_iter().try_for_each(|block| {
Self::insert_header(tx, &block.header)?;
// Insert into body tables.
tx.put::<tables::BlockBodyIndices>(
block.number,
StoredBlockBodyIndices {
first_tx_num: next_tx_num,
tx_count: block.body.len() as u64,
},
)?;
let block_body_indices = StoredBlockBodyIndices {
first_tx_num: next_tx_num,
tx_count: block.body.len() as u64,
};
if !block.body.is_empty() {
tx.put::<tables::TransactionBlock>(
block_body_indices.last_tx_num(),
block.number,
)?;
}
tx.put::<tables::BlockBodyIndices>(block.number, block_body_indices)?;
block.body.iter().try_for_each(|body_tx| {
tx.put::<tables::Transactions>(next_tx_num, body_tx.clone().into())?;
next_tx_num += 1;