feat(stages): normalized tx lookup commits (#3022)

This commit is contained in:
Roman Krasiuk
2023-06-07 16:44:45 +03:00
committed by GitHub
parent 4e7f31c11c
commit 99a314c59b

View File

@ -11,11 +11,10 @@ use reth_db::{
use reth_primitives::{
rpc_utils::keccak256,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
BlockNumber, TransactionSignedNoHash, TxNumber, H256,
TransactionSignedNoHash, TxNumber, H256,
};
use reth_provider::Transaction;
use std::ops::Deref;
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::*;
@ -26,13 +25,13 @@ use tracing::*;
/// [`tables::TxHashNumber`] This is used for looking up changesets via the transaction hash.
#[derive(Debug, Clone)]
pub struct TransactionLookupStage {
/// The number of blocks to commit at once
/// The number of lookup entries to commit at once
commit_threshold: u64,
}
impl Default for TransactionLookupStage {
fn default() -> Self {
Self { commit_threshold: 50_000 }
Self { commit_threshold: 100_000 }
}
}
@ -56,27 +55,18 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
if range.is_empty() {
return Ok(ExecOutput::done(*range.end()))
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint().block_number))
}
let (start_block, end_block) = range.into_inner();
debug!(target: "sync::stages::transaction_lookup", start_block, end_block, "Commencing sync");
let (tx_range, block_range, is_final_range) =
input.next_block_range_with_transaction_threshold(tx, self.commit_threshold)?;
let end_block = *block_range.end();
let mut block_meta_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let (_, first_block) = block_meta_cursor.seek_exact(start_block)?.ok_or(
StageError::from(TransactionLookupStageError::TransactionLookup { block: start_block }),
)?;
let (_, last_block) = block_meta_cursor.seek_exact(end_block)?.ok_or(StageError::from(
TransactionLookupStageError::TransactionLookup { block: end_block },
))?;
debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Updating transaction lookup");
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
let tx_walker =
tx_cursor.walk_range(first_block.first_tx_num()..=last_block.last_tx_num())?;
let tx_walker = tx_cursor.walk_range(tx_range)?;
let chunk_size = 100_000 / rayon::current_num_threads();
let mut channels = Vec::with_capacity(chunk_size);
@ -192,18 +182,6 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
}
#[derive(Error, Debug)]
enum TransactionLookupStageError {
#[error("Transaction lookup failed to find block {block}.")]
TransactionLookup { block: BlockNumber },
}
impl From<TransactionLookupStageError> for StageError {
fn from(error: TransactionLookupStageError) -> Self {
StageError::Fatal(Box::new(error))
}
}
fn stage_checkpoint<DB: Database>(
tx: &Transaction<'_, DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
@ -280,40 +258,52 @@ mod tests {
};
// Seed only once with full input range
runner.seed_execution(first_input).expect("failed to seed execution");
let seed = random_block_range(stage_progress + 1..=previous_stage, H256::zero(), 0..4); // set tx count range high enough to hit the threshold
runner.tx.insert_blocks(seed.iter(), None).expect("failed to seed execution");
let total_txs = runner.tx.table::<tables::Transactions>().unwrap().len() as u64;
// Execute first time
let result = runner.execute(first_input).await.unwrap();
let expected_progress = stage_progress + threshold;
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: false }) if block_number == expected_progress &&
processed == runner.tx.table::<tables::TxHashNumber>().unwrap().len() as u64 &&
total == runner.tx.table::<tables::Transactions>().unwrap().len() as u64
let mut tx_count = 0;
let expected_progress = seed
.iter()
.find(|x| {
tx_count += x.body.len();
tx_count as u64 > threshold
})
.map(|x| x.number)
.unwrap_or(previous_stage);
assert_matches!(result, Ok(_));
assert_eq!(
result.unwrap(),
ExecOutput {
checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint(
EntitiesCheckpoint {
processed: runner.tx.table::<tables::TxHashNumber>().unwrap().len() as u64,
total: total_txs
}
),
done: false
}
);
// Execute second time
// Execute second time to completion
runner.set_threshold(u64::MAX);
let second_input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
checkpoint: Some(StageCheckpoint::new(expected_progress)),
};
let result = runner.execute(second_input).await.unwrap();
assert_matches!(
result,
Ok(ExecOutput {checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
total == runner.tx.table::<tables::Transactions>().unwrap().len() as u64
assert_matches!(result, Ok(_));
assert_eq!(
result.as_ref().unwrap(),
&ExecOutput {
checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint(
EntitiesCheckpoint { processed: total_txs, total: total_txs }
),
done: true
}
);
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");