refactor: Stage execute and unwind take an immutable provider reference instead (#3390)

This commit is contained in:
joshieDo
2023-06-27 14:35:00 +01:00
committed by GitHub
parent 46dd5b8482
commit e10ab387a1
27 changed files with 102 additions and 104 deletions

View File

@ -69,7 +69,7 @@ impl Command {
let db = Arc::new(init_db(db_path)?); let db = Arc::new(init_db(db_path)?);
let factory = ProviderFactory::new(&db, self.chain.clone()); let factory = ProviderFactory::new(&db, self.chain.clone());
let mut provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?; let provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?;
let execution_checkpoint_block = let execution_checkpoint_block =
provider_rw.get_stage_checkpoint(StageId::Execution)?.unwrap_or_default().block_number; provider_rw.get_stage_checkpoint(StageId::Execution)?.unwrap_or_default().block_number;
@ -110,7 +110,7 @@ impl Command {
execution_stage execution_stage
.execute( .execute(
&mut provider_rw, &provider_rw,
ExecInput { ExecInput {
target: Some(block), target: Some(block),
checkpoint: block.checked_sub(1).map(StageCheckpoint::new), checkpoint: block.checked_sub(1).map(StageCheckpoint::new),
@ -122,7 +122,7 @@ impl Command {
while !account_hashing_done { while !account_hashing_done {
let output = account_hashing_stage let output = account_hashing_stage
.execute( .execute(
&mut provider_rw, &provider_rw,
ExecInput { ExecInput {
target: Some(block), target: Some(block),
checkpoint: progress.map(StageCheckpoint::new), checkpoint: progress.map(StageCheckpoint::new),
@ -136,7 +136,7 @@ impl Command {
while !storage_hashing_done { while !storage_hashing_done {
let output = storage_hashing_stage let output = storage_hashing_stage
.execute( .execute(
&mut provider_rw, &provider_rw,
ExecInput { ExecInput {
target: Some(block), target: Some(block),
checkpoint: progress.map(StageCheckpoint::new), checkpoint: progress.map(StageCheckpoint::new),
@ -148,7 +148,7 @@ impl Command {
let incremental_result = merkle_stage let incremental_result = merkle_stage
.execute( .execute(
&mut provider_rw, &provider_rw,
ExecInput { ExecInput {
target: Some(block), target: Some(block),
checkpoint: progress.map(StageCheckpoint::new), checkpoint: progress.map(StageCheckpoint::new),
@ -171,7 +171,7 @@ impl Command {
let clean_input = ExecInput { target: Some(block), checkpoint: None }; let clean_input = ExecInput { target: Some(block), checkpoint: None };
loop { loop {
let clean_result = merkle_stage.execute(&mut provider_rw, clean_input).await; let clean_result = merkle_stage.execute(&provider_rw, clean_input).await;
assert!(clean_result.is_ok(), "Clean state root calculation failed"); assert!(clean_result.is_ok(), "Clean state root calculation failed");
if clean_result.unwrap().done { if clean_result.unwrap().done {
break break

View File

@ -95,13 +95,13 @@ async fn unwind_and_copy<DB: Database>(
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>, output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone()); let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let mut provider = factory.provider_rw()?; let provider = factory.provider_rw()?;
let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(db_tool.chain.clone())); let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(db_tool.chain.clone()));
exec_stage exec_stage
.unwind( .unwind(
&mut provider, &provider,
UnwindInput { UnwindInput {
unwind_to: from, unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number), checkpoint: StageCheckpoint::new(tip_block_number),
@ -130,12 +130,12 @@ async fn dry_run<DB: Database>(
info!(target: "reth::cli", "Executing stage. [dry-run]"); info!(target: "reth::cli", "Executing stage. [dry-run]");
let factory = ProviderFactory::new(&output_db, chain.clone()); let factory = ProviderFactory::new(&output_db, chain.clone());
let mut provider = factory.provider_rw()?; let provider = factory.provider_rw()?;
let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(chain.clone())); let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(chain.clone()));
exec_stage exec_stage
.execute( .execute(
&mut provider, &provider,
reth_stages::ExecInput { reth_stages::ExecInput {
target: Some(to), target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)), checkpoint: Some(StageCheckpoint::new(from)),

View File

@ -39,12 +39,12 @@ async fn unwind_and_copy<DB: Database>(
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>, output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone()); let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let mut provider = factory.provider_rw()?; let provider = factory.provider_rw()?;
let mut exec_stage = AccountHashingStage::default(); let mut exec_stage = AccountHashingStage::default();
exec_stage exec_stage
.unwind( .unwind(
&mut provider, &provider,
UnwindInput { UnwindInput {
unwind_to: from, unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number), checkpoint: StageCheckpoint::new(tip_block_number),
@ -69,7 +69,7 @@ async fn dry_run<DB: Database>(
info!(target: "reth::cli", "Executing stage."); info!(target: "reth::cli", "Executing stage.");
let factory = ProviderFactory::new(&output_db, chain); let factory = ProviderFactory::new(&output_db, chain);
let mut provider = factory.provider_rw()?; let provider = factory.provider_rw()?;
let mut exec_stage = AccountHashingStage { let mut exec_stage = AccountHashingStage {
clean_threshold: 1, // Forces hashing from scratch clean_threshold: 1, // Forces hashing from scratch
..Default::default() ..Default::default()
@ -79,7 +79,7 @@ async fn dry_run<DB: Database>(
while !exec_output { while !exec_output {
exec_output = exec_stage exec_output = exec_stage
.execute( .execute(
&mut provider, &provider,
reth_stages::ExecInput { reth_stages::ExecInput {
target: Some(to), target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)), checkpoint: Some(StageCheckpoint::new(from)),

View File

@ -34,13 +34,13 @@ async fn unwind_and_copy<DB: Database>(
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>, output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone()); let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let mut provider = factory.provider_rw()?; let provider = factory.provider_rw()?;
let mut exec_stage = StorageHashingStage::default(); let mut exec_stage = StorageHashingStage::default();
exec_stage exec_stage
.unwind( .unwind(
&mut provider, &provider,
UnwindInput { UnwindInput {
unwind_to: from, unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number), checkpoint: StageCheckpoint::new(tip_block_number),
@ -68,7 +68,7 @@ async fn dry_run<DB: Database>(
info!(target: "reth::cli", "Executing stage."); info!(target: "reth::cli", "Executing stage.");
let factory = ProviderFactory::new(&output_db, chain); let factory = ProviderFactory::new(&output_db, chain);
let mut provider = factory.provider_rw()?; let provider = factory.provider_rw()?;
let mut exec_stage = StorageHashingStage { let mut exec_stage = StorageHashingStage {
clean_threshold: 1, // Forces hashing from scratch clean_threshold: 1, // Forces hashing from scratch
..Default::default() ..Default::default()
@ -78,7 +78,7 @@ async fn dry_run<DB: Database>(
while !exec_output { while !exec_output {
exec_output = exec_stage exec_output = exec_stage
.execute( .execute(
&mut provider, &provider,
reth_stages::ExecInput { reth_stages::ExecInput {
target: Some(to), target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)), checkpoint: Some(StageCheckpoint::new(from)),

View File

@ -49,7 +49,7 @@ async fn unwind_and_copy<DB: Database>(
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let (from, to) = range; let (from, to) = range;
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone()); let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let mut provider = factory.provider_rw()?; let provider = factory.provider_rw()?;
let unwind = UnwindInput { let unwind = UnwindInput {
unwind_to: from, unwind_to: from,
@ -61,10 +61,10 @@ async fn unwind_and_copy<DB: Database>(
// Unwind hashes all the way to FROM // Unwind hashes all the way to FROM
StorageHashingStage::default().unwind(&mut provider, unwind).await.unwrap(); StorageHashingStage::default().unwind(&provider, unwind).await.unwrap();
AccountHashingStage::default().unwind(&mut provider, unwind).await.unwrap(); AccountHashingStage::default().unwind(&provider, unwind).await.unwrap();
MerkleStage::default_unwind().unwind(&mut provider, unwind).await?; MerkleStage::default_unwind().unwind(&provider, unwind).await?;
// Bring Plainstate to TO (hashing stage execution requires it) // Bring Plainstate to TO (hashing stage execution requires it)
let mut exec_stage = ExecutionStage::new( let mut exec_stage = ExecutionStage::new(
@ -74,7 +74,7 @@ async fn unwind_and_copy<DB: Database>(
exec_stage exec_stage
.unwind( .unwind(
&mut provider, &provider,
UnwindInput { UnwindInput {
unwind_to: to, unwind_to: to,
checkpoint: StageCheckpoint::new(tip_block_number), checkpoint: StageCheckpoint::new(tip_block_number),
@ -86,11 +86,11 @@ async fn unwind_and_copy<DB: Database>(
// Bring hashes to TO // Bring hashes to TO
AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX } AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&mut provider, execute_input) .execute(&provider, execute_input)
.await .await
.unwrap(); .unwrap();
StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX } StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&mut provider, execute_input) .execute(&provider, execute_input)
.await .await
.unwrap(); .unwrap();
@ -116,7 +116,7 @@ async fn dry_run<DB: Database>(
) -> eyre::Result<()> { ) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage."); info!(target: "reth::cli", "Executing stage.");
let factory = ProviderFactory::new(&output_db, chain); let factory = ProviderFactory::new(&output_db, chain);
let mut provider = factory.provider_rw()?; let provider = factory.provider_rw()?;
let mut exec_output = false; let mut exec_output = false;
while !exec_output { while !exec_output {
exec_output = MerkleStage::Execution { exec_output = MerkleStage::Execution {
@ -125,7 +125,7 @@ async fn dry_run<DB: Database>(
* scratch */ * scratch */
} }
.execute( .execute(
&mut provider, &provider,
reth_stages::ExecInput { reth_stages::ExecInput {
target: Some(to), target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)), checkpoint: Some(StageCheckpoint::new(from)),

View File

@ -229,7 +229,7 @@ impl Command {
if !self.skip_unwind { if !self.skip_unwind {
while unwind.checkpoint.block_number > self.from { while unwind.checkpoint.block_number > self.from {
let unwind_output = unwind_stage.unwind(&mut provider_rw, unwind).await?; let unwind_output = unwind_stage.unwind(&provider_rw, unwind).await?;
unwind.checkpoint = unwind_output.checkpoint; unwind.checkpoint = unwind_output.checkpoint;
if self.commit { if self.commit {
@ -245,7 +245,7 @@ impl Command {
}; };
while let ExecOutput { checkpoint: stage_progress, done: false } = while let ExecOutput { checkpoint: stage_progress, done: false } =
exec_stage.execute(&mut provider_rw, input).await? exec_stage.execute(&provider_rw, input).await?
{ {
input.checkpoint = Some(stage_progress); input.checkpoint = Some(stage_progress);

View File

@ -137,8 +137,8 @@ fn measure_stage_with_path<F, S>(
|_| async { |_| async {
let mut stage = stage.clone(); let mut stage = stage.clone();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
stage.execute(&mut provider, input).await.unwrap(); stage.execute(&provider, input).await.unwrap();
provider.commit().unwrap(); provider.commit().unwrap();
}, },
) )

View File

@ -63,8 +63,8 @@ fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) {
std::fs::create_dir_all(&path).unwrap(); std::fs::create_dir_all(&path).unwrap();
println!("Account Hashing testdata not found, generating to {:?}", path.display()); println!("Account Hashing testdata not found, generating to {:?}", path.display());
let tx = TestTransaction::new(&path); let tx = TestTransaction::new(&path);
let mut provider = tx.inner_rw(); let provider = tx.inner_rw();
let _accounts = AccountHashingStage::seed(&mut provider, opts); let _accounts = AccountHashingStage::seed(&provider, opts);
provider.commit().expect("failed to commit"); provider.commit().expect("failed to commit");
} }
(path, (ExecInput { target: Some(num_blocks), ..Default::default() }, UnwindInput::default())) (path, (ExecInput { target: Some(num_blocks), ..Default::default() }, UnwindInput::default()))

View File

@ -42,11 +42,11 @@ pub(crate) fn stage_unwind<S: Clone + Stage<Env<WriteMap>>>(
tokio::runtime::Runtime::new().unwrap().block_on(async { tokio::runtime::Runtime::new().unwrap().block_on(async {
let mut stage = stage.clone(); let mut stage = stage.clone();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
// Clear previous run // Clear previous run
stage stage
.unwind(&mut provider, unwind) .unwind(&provider, unwind)
.await .await
.map_err(|e| { .map_err(|e| {
format!( format!(
@ -70,16 +70,16 @@ pub(crate) fn unwind_hashes<S: Clone + Stage<Env<WriteMap>>>(
tokio::runtime::Runtime::new().unwrap().block_on(async { tokio::runtime::Runtime::new().unwrap().block_on(async {
let mut stage = stage.clone(); let mut stage = stage.clone();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
StorageHashingStage::default().unwind(&mut provider, unwind).await.unwrap(); StorageHashingStage::default().unwind(&provider, unwind).await.unwrap();
AccountHashingStage::default().unwind(&mut provider, unwind).await.unwrap(); AccountHashingStage::default().unwind(&provider, unwind).await.unwrap();
// Clear previous run // Clear previous run
stage.unwind(&mut provider, unwind).await.unwrap(); stage.unwind(&provider, unwind).await.unwrap();
AccountHashingStage::default().execute(&mut provider, input).await.unwrap(); AccountHashingStage::default().execute(&provider, input).await.unwrap();
StorageHashingStage::default().execute(&mut provider, input).await.unwrap(); StorageHashingStage::default().execute(&provider, input).await.unwrap();
provider.commit().unwrap(); provider.commit().unwrap();
}); });

View File

@ -276,7 +276,7 @@ where
let input = UnwindInput { checkpoint, unwind_to: to, bad_block }; let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
self.listeners.notify(PipelineEvent::Unwinding { stage_id, input }); self.listeners.notify(PipelineEvent::Unwinding { stage_id, input });
let output = stage.unwind(&mut provider_rw, input).await; let output = stage.unwind(&provider_rw, input).await;
match output { match output {
Ok(unwind_output) => { Ok(unwind_output) => {
checkpoint = unwind_output.checkpoint; checkpoint = unwind_output.checkpoint;
@ -358,7 +358,7 @@ where
}); });
match stage match stage
.execute(&mut provider_rw, ExecInput { target, checkpoint: prev_checkpoint }) .execute(&provider_rw, ExecInput { target, checkpoint: prev_checkpoint })
.await .await
{ {
Ok(out @ ExecOutput { checkpoint, done }) => { Ok(out @ ExecOutput { checkpoint, done }) => {

View File

@ -181,14 +181,14 @@ pub trait Stage<DB: Database>: Send + Sync {
/// Execute the stage. /// Execute the stage.
async fn execute( async fn execute(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError>; ) -> Result<ExecOutput, StageError>;
/// Unwind the stage. /// Unwind the stage.
async fn unwind( async fn unwind(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError>; ) -> Result<UnwindOutput, StageError>;
} }

View File

@ -67,7 +67,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
/// header, limited by the stage's batch size. /// header, limited by the stage's batch size.
async fn execute( async fn execute(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
if input.target_reached() { if input.target_reached() {
@ -163,7 +163,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
/// Unwind the stage. /// Unwind the stage.
async fn unwind( async fn unwind(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref(); let tx = provider.tx_ref();

View File

@ -87,7 +87,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
/// Execute the stage. /// Execute the stage.
pub fn execute_inner<DB: Database>( pub fn execute_inner<DB: Database>(
&self, &self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
if input.target_reached() { if input.target_reached() {
@ -262,7 +262,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
/// Execute the stage /// Execute the stage
async fn execute( async fn execute(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
// For Ethereum transactions that reaches the max call depth (1024) revm can use more stack // For Ethereum transactions that reaches the max call depth (1024) revm can use more stack
@ -289,7 +289,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
/// Unwind the stage. /// Unwind the stage.
async fn unwind( async fn unwind(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref(); let tx = provider.tx_ref();
@ -583,6 +583,7 @@ mod tests {
// insert pre state // insert pre state
let provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let db_tx = provider.tx_ref(); let db_tx = provider.tx_ref();
let acc1 = H160(hex!("1000000000000000000000000000000000000000")); let acc1 = H160(hex!("1000000000000000000000000000000000000000"));
let acc2 = H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b")); let acc2 = H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b"));
@ -604,9 +605,9 @@ mod tests {
db_tx.put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into())).unwrap(); db_tx.put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into())).unwrap();
provider.commit().unwrap(); provider.commit().unwrap();
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let mut execution_stage = stage(); let mut execution_stage = stage();
let output = execution_stage.execute(&mut provider, input).await.unwrap(); let output = execution_stage.execute(&provider, input).await.unwrap();
provider.commit().unwrap(); provider.commit().unwrap();
assert_matches!(output, ExecOutput { assert_matches!(output, ExecOutput {
checkpoint: StageCheckpoint { checkpoint: StageCheckpoint {
@ -696,6 +697,7 @@ mod tests {
let code_hash = keccak256(code); let code_hash = keccak256(code);
// pre state // pre state
let provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let db_tx = provider.tx_ref(); let db_tx = provider.tx_ref();
let acc1 = H160(hex!("1000000000000000000000000000000000000000")); let acc1 = H160(hex!("1000000000000000000000000000000000000000"));
let acc1_info = Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) }; let acc1_info = Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) };
@ -708,16 +710,16 @@ mod tests {
provider.commit().unwrap(); provider.commit().unwrap();
// execute // execute
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let mut execution_stage = stage(); let mut execution_stage = stage();
let result = execution_stage.execute(&mut provider, input).await.unwrap(); let result = execution_stage.execute(&provider, input).await.unwrap();
provider.commit().unwrap(); provider.commit().unwrap();
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let mut stage = stage(); let mut stage = stage();
let result = stage let result = stage
.unwind( .unwind(
&mut provider, &provider,
UnwindInput { checkpoint: result.checkpoint, unwind_to: 0, bad_block: None }, UnwindInput { checkpoint: result.checkpoint, unwind_to: 0, bad_block: None },
) )
.await .await
@ -811,9 +813,9 @@ mod tests {
provider.commit().unwrap(); provider.commit().unwrap();
// execute // execute
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let mut execution_stage = stage(); let mut execution_stage = stage();
let _ = execution_stage.execute(&mut provider, input).await.unwrap(); let _ = execution_stage.execute(&provider, input).await.unwrap();
provider.commit().unwrap(); provider.commit().unwrap();
// assert unwind stage // assert unwind stage

View File

@ -18,7 +18,7 @@ impl<DB: Database> Stage<DB> for FinishStage {
async fn execute( async fn execute(
&mut self, &mut self,
_provider: &mut DatabaseProviderRW<'_, &DB>, _provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true })
@ -26,7 +26,7 @@ impl<DB: Database> Stage<DB> for FinishStage {
async fn unwind( async fn unwind(
&mut self, &mut self,
_provider: &mut DatabaseProviderRW<'_, &DB>, _provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) }) Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })

View File

@ -79,7 +79,7 @@ impl AccountHashingStage {
/// Proceeds to go to the `BlockTransitionIndex` end, go back `transitions` and change the /// Proceeds to go to the `BlockTransitionIndex` end, go back `transitions` and change the
/// account state in the `AccountChangeSet` table. /// account state in the `AccountChangeSet` table.
pub fn seed<DB: Database>( pub fn seed<DB: Database>(
provider: &mut DatabaseProviderRW<'_, DB>, provider: &DatabaseProviderRW<'_, DB>,
opts: SeedOpts, opts: SeedOpts,
) -> Result<Vec<(reth_primitives::Address, reth_primitives::Account)>, StageError> { ) -> Result<Vec<(reth_primitives::Address, reth_primitives::Account)>, StageError> {
use reth_db::models::AccountBeforeTx; use reth_db::models::AccountBeforeTx;
@ -135,7 +135,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
/// Execute the stage. /// Execute the stage.
async fn execute( async fn execute(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
if input.target_reached() { if input.target_reached() {
@ -267,7 +267,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
/// Unwind the stage. /// Unwind the stage.
async fn unwind( async fn unwind(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, _) = let (range, unwind_progress, _) =
@ -535,9 +535,9 @@ mod tests {
type Seed = Vec<(Address, Account)>; type Seed = Vec<(Address, Account)>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> { fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let mut provider = self.tx.inner_rw(); let provider = self.tx.inner_rw();
let res = Ok(AccountHashingStage::seed( let res = Ok(AccountHashingStage::seed(
&mut provider, &provider,
SeedOpts { blocks: 1..=input.target(), accounts: 0..10, txs: 0..3 }, SeedOpts { blocks: 1..=input.target(), accounts: 0..10, txs: 0..3 },
) )
.unwrap()); .unwrap());

View File

@ -54,7 +54,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
/// Execute the stage. /// Execute the stage.
async fn execute( async fn execute(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
let tx = provider.tx_ref(); let tx = provider.tx_ref();
@ -193,7 +193,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
/// Unwind the stage. /// Unwind the stage.
async fn unwind( async fn unwind(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, _) = let (range, unwind_progress, _) =

View File

@ -20,7 +20,6 @@ use reth_primitives::{
BlockHashOrNumber, BlockNumber, SealedHeader, H256, BlockHashOrNumber, BlockNumber, SealedHeader, H256,
}; };
use reth_provider::DatabaseProviderRW; use reth_provider::DatabaseProviderRW;
use std::ops::Deref;
use tokio::sync::watch; use tokio::sync::watch;
use tracing::*; use tracing::*;
@ -196,14 +195,14 @@ where
/// starting from the tip of the chain /// starting from the tip of the chain
async fn execute( async fn execute(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
let tx = provider.tx_ref(); let tx = provider.tx_ref();
let current_checkpoint = input.checkpoint(); let current_checkpoint = input.checkpoint();
// Lookup the head and tip of the sync range // Lookup the head and tip of the sync range
let gap = self.get_sync_gap(provider.deref(), current_checkpoint.block_number).await?; let gap = self.get_sync_gap(provider, current_checkpoint.block_number).await?;
let local_head = gap.local_head.number; let local_head = gap.local_head.number;
let tip = gap.target.tip(); let tip = gap.target.tip();
@ -326,7 +325,7 @@ where
/// Unwind the stage. /// Unwind the stage.
async fn unwind( async fn unwind(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
// TODO: handle bad block // TODO: handle bad block

View File

@ -37,7 +37,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
/// Execute the stage. /// Execute the stage.
async fn execute( async fn execute(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
if input.target_reached() { if input.target_reached() {
@ -56,7 +56,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
/// Unwind the stage. /// Unwind the stage.
async fn unwind( async fn unwind(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, _) = let (range, unwind_progress, _) =
@ -142,8 +142,8 @@ mod tests {
let input = ExecInput { target: Some(run_to), ..Default::default() }; let input = ExecInput { target: Some(run_to), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default(); let mut stage = IndexAccountHistoryStage::default();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let out = stage.execute(&mut provider, input).await.unwrap(); let out = stage.execute(&provider, input).await.unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
provider.commit().unwrap(); provider.commit().unwrap();
} }
@ -156,8 +156,8 @@ mod tests {
}; };
let mut stage = IndexAccountHistoryStage::default(); let mut stage = IndexAccountHistoryStage::default();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let out = stage.unwind(&mut provider, input).await.unwrap(); let out = stage.unwind(&provider, input).await.unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) }); assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
provider.commit().unwrap(); provider.commit().unwrap();
} }

View File

@ -37,7 +37,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
/// Execute the stage. /// Execute the stage.
async fn execute( async fn execute(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
if input.target_reached() { if input.target_reached() {
@ -55,7 +55,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
/// Unwind the stage. /// Unwind the stage.
async fn unwind( async fn unwind(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, _) = let (range, unwind_progress, _) =
@ -150,8 +150,8 @@ mod tests {
let input = ExecInput { target: Some(run_to), ..Default::default() }; let input = ExecInput { target: Some(run_to), ..Default::default() };
let mut stage = IndexStorageHistoryStage::default(); let mut stage = IndexStorageHistoryStage::default();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let out = stage.execute(&mut provider, input).await.unwrap(); let out = stage.execute(&provider, input).await.unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
provider.commit().unwrap(); provider.commit().unwrap();
} }
@ -164,8 +164,8 @@ mod tests {
}; };
let mut stage = IndexStorageHistoryStage::default(); let mut stage = IndexStorageHistoryStage::default();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let out = stage.unwind(&mut provider, input).await.unwrap(); let out = stage.unwind(&provider, input).await.unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) }); assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
provider.commit().unwrap(); provider.commit().unwrap();
} }

View File

@ -145,7 +145,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
/// Execute the stage. /// Execute the stage.
async fn execute( async fn execute(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
let threshold = match self { let threshold = match self {
@ -281,7 +281,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
/// Unwind the stage. /// Unwind the stage.
async fn unwind( async fn unwind(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref(); let tx = provider.tx_ref();

View File

@ -56,7 +56,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
/// the [`TxSenders`][reth_db::tables::TxSenders] table. /// the [`TxSenders`][reth_db::tables::TxSenders] table.
async fn execute( async fn execute(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
if input.target_reached() { if input.target_reached() {
@ -167,7 +167,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
/// Unwind the stage. /// Unwind the stage.
async fn unwind( async fn unwind(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);

View File

@ -51,7 +51,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
/// Write total difficulty entries /// Write total difficulty entries
async fn execute( async fn execute(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
let tx = provider.tx_ref(); let tx = provider.tx_ref();
@ -98,7 +98,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
/// Unwind the stage. /// Unwind the stage.
async fn unwind( async fn unwind(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);

View File

@ -51,7 +51,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
/// Write transaction hash -> id entries /// Write transaction hash -> id entries
async fn execute( async fn execute(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
if input.target_reached() { if input.target_reached() {
@ -136,7 +136,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
/// Unwind the stage. /// Unwind the stage.
async fn unwind( async fn unwind(
&mut self, &mut self,
provider: &mut DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref(); let tx = provider.tx_ref();

View File

@ -48,9 +48,9 @@ pub(crate) trait ExecuteStageTestRunner: StageTestRunner {
let (db, mut stage) = (self.tx().inner_raw(), self.stage()); let (db, mut stage) = (self.tx().inner_raw(), self.stage());
tokio::spawn(async move { tokio::spawn(async move {
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone()); let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let result = stage.execute(&mut provider, input).await; let result = stage.execute(&provider, input).await;
provider.commit().expect("failed to commit"); provider.commit().expect("failed to commit");
tx.send(result).expect("failed to send message") tx.send(result).expect("failed to send message")
}); });
@ -74,9 +74,9 @@ pub(crate) trait UnwindStageTestRunner: StageTestRunner {
let (db, mut stage) = (self.tx().inner_raw(), self.stage()); let (db, mut stage) = (self.tx().inner_raw(), self.stage());
tokio::spawn(async move { tokio::spawn(async move {
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone()); let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let result = stage.unwind(&mut provider, input).await; let result = stage.unwind(&provider, input).await;
provider.commit().expect("failed to commit"); provider.commit().expect("failed to commit");
tx.send(result).expect("failed to send result"); tx.send(result).expect("failed to send result");
}); });

View File

@ -48,7 +48,7 @@ impl<DB: Database> Stage<DB> for TestStage {
async fn execute( async fn execute(
&mut self, &mut self,
_: &mut DatabaseProviderRW<'_, &DB>, _: &DatabaseProviderRW<'_, &DB>,
_input: ExecInput, _input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
self.exec_outputs self.exec_outputs
@ -58,7 +58,7 @@ impl<DB: Database> Stage<DB> for TestStage {
async fn unwind( async fn unwind(
&mut self, &mut self,
_: &mut DatabaseProviderRW<'_, &DB>, _: &DatabaseProviderRW<'_, &DB>,
_input: UnwindInput, _input: UnwindInput,
) -> Result<UnwindOutput, StageError> { ) -> Result<UnwindOutput, StageError> {
self.unwind_outputs self.unwind_outputs

View File

@ -26,7 +26,7 @@ mod test {
.build(); .build();
let factory = ProviderFactory::new(db.as_ref(), Arc::new(chain_spec.clone())); let factory = ProviderFactory::new(db.as_ref(), Arc::new(chain_spec.clone()));
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let data = BlockChainTestData::default(); let data = BlockChainTestData::default();
let genesis = data.genesis.clone(); let genesis = data.genesis.clone();
@ -144,7 +144,7 @@ mod test {
); );
let factory = ProviderFactory::new(db.as_ref(), chain_spec.clone()); let factory = ProviderFactory::new(db.as_ref(), chain_spec.clone());
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
let data = BlockChainTestData::default(); let data = BlockChainTestData::default();
let genesis = data.genesis.clone(); let genesis = data.genesis.clone();

View File

@ -77,7 +77,7 @@ impl Case for BlockchainTestCase {
// Create the database // Create the database
let db = create_test_rw_db(); let db = create_test_rw_db();
let factory = ProviderFactory::new(db.as_ref(), Arc::new(case.network.clone().into())); let factory = ProviderFactory::new(db.as_ref(), Arc::new(case.network.clone().into()));
let mut provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
// Insert test state // Insert test state
provider.insert_block( provider.insert_block(
@ -105,10 +105,7 @@ impl Case for BlockchainTestCase {
.block_on(async { .block_on(async {
// ignore error // ignore error
let _ = stage let _ = stage
.execute( .execute(&provider, ExecInput { target: last_block, checkpoint: None })
&mut provider,
ExecInput { target: last_block, checkpoint: None },
)
.await; .await;
}); });
} }