diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 280a6e39b..50a94252b 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -114,42 +114,33 @@ impl Command { // TODO: Remove magic numbers let fetch_client = Arc::new(network.fetch_client().await?); let mut pipeline = reth_stages::Pipeline::new() - .push( - HeaderStage { - downloader: headers::linear::LinearDownloadBuilder::default() - .batch_size(config.stages.headers.downloader_batch_size) - .retries(config.stages.headers.downloader_retries) - .build(consensus.clone(), fetch_client.clone()), - consensus: consensus.clone(), - client: fetch_client.clone(), - network_handle: network.clone(), - commit_threshold: config.stages.headers.commit_threshold, - }, - false, - ) - .push( - BodyStage { - downloader: Arc::new( - bodies::concurrent::ConcurrentDownloader::new( - fetch_client.clone(), - consensus.clone(), - ) - .with_batch_size(config.stages.bodies.downloader_batch_size) - .with_retries(config.stages.bodies.downloader_retries) - .with_concurrency(config.stages.bodies.downloader_concurrency), - ), - consensus: consensus.clone(), - commit_threshold: config.stages.bodies.commit_threshold, - }, - false, - ) - .push( - SendersStage { - batch_size: config.stages.senders.batch_size, - commit_threshold: config.stages.senders.commit_threshold, - }, - false, - ); + .push(HeaderStage { + downloader: headers::linear::LinearDownloadBuilder::default() + .batch_size(config.stages.headers.downloader_batch_size) + .retries(config.stages.headers.downloader_retries) + .build(consensus.clone(), fetch_client.clone()), + consensus: consensus.clone(), + client: fetch_client.clone(), + network_handle: network.clone(), + commit_threshold: config.stages.headers.commit_threshold, + }) + .push(BodyStage { + downloader: Arc::new( + bodies::concurrent::ConcurrentDownloader::new( + fetch_client.clone(), + consensus.clone(), + ) + .with_batch_size(config.stages.bodies.downloader_batch_size) + .with_retries(config.stages.bodies.downloader_retries) + .with_concurrency(config.stages.bodies.downloader_concurrency), + ), + consensus: consensus.clone(), + commit_threshold: config.stages.bodies.commit_threshold, + }) + .push(SendersStage { + batch_size: config.stages.senders.batch_size, + commit_threshold: config.stages.senders.commit_threshold, + }); if let Some(tip) = self.tip { debug!("Tip manually set: {}", tip); diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index 0d84293e4..0af006607 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -108,24 +108,19 @@ impl Pipeline { /// # Unwinding /// /// The unwind priority is set to 0. - pub fn push(self, stage: S, require_tip: bool) -> Self + pub fn push(self, stage: S) -> Self where S: Stage + 'static, { - self.push_with_unwind_priority(stage, require_tip, 0) + self.push_with_unwind_priority(stage, 0) } /// Add a stage to the pipeline, specifying the unwind priority. - pub fn push_with_unwind_priority( - mut self, - stage: S, - require_tip: bool, - unwind_priority: usize, - ) -> Self + pub fn push_with_unwind_priority(mut self, stage: S, unwind_priority: usize) -> Self where S: Stage + 'static, { - self.stages.push(QueuedStage { stage: Box::new(stage), require_tip, unwind_priority }); + self.stages.push(QueuedStage { stage: Box::new(stage), unwind_priority }); self } @@ -152,7 +147,6 @@ impl Pipeline { max_block: self.max_block, maximum_progress: None, minimum_progress: None, - reached_tip: true, }; let next_action = self.run_loop(&mut state, db.as_ref()).await?; @@ -277,9 +271,6 @@ struct QueuedStage { stage: Box>, /// The unwind priority of the stage. unwind_priority: usize, - /// Whether or not this stage can only execute when we reach what we believe to be the tip of - /// the chain. - require_tip: bool, } impl QueuedStage { @@ -291,19 +282,6 @@ impl QueuedStage { db: &DB, ) -> Result { let stage_id = self.stage.id(); - if self.require_tip && !state.reached_tip() { - warn!( - target: "sync::pipeline", - stage = %stage_id, - "Tip not reached as required by stage, skipping." - ); - state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?; - - // Stage requires us to reach the tip of the chain first, but we have - // not. - return Ok(ControlFlow::Continue) - } - loop { let mut tx = Transaction::new(db)?; @@ -321,7 +299,6 @@ impl QueuedStage { state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?; // We reached the maximum block, so we skip the stage - state.set_reached_tip(true); return Ok(ControlFlow::Continue) } @@ -335,7 +312,7 @@ impl QueuedStage { .execute(&mut tx, ExecInput { previous_stage, stage_progress: prev_progress }) .await { - Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => { + Ok(out @ ExecOutput { stage_progress, done }) => { info!( target: "sync::pipeline", stage = %stage_id, @@ -354,7 +331,6 @@ impl QueuedStage { tx.commit()?; state.record_progress_outliers(stage_progress); - state.set_reached_tip(reached_tip); if done { return Ok(ControlFlow::Continue) @@ -422,20 +398,12 @@ mod tests { tokio::spawn(async move { Pipeline::>::new_with_channel(tx) .push( - TestStage::new(StageId("A")).add_exec(Ok(ExecOutput { - stage_progress: 20, - done: true, - reached_tip: true, - })), - false, + TestStage::new(StageId("A")) + .add_exec(Ok(ExecOutput { stage_progress: 20, done: true })), ) .push( - TestStage::new(StageId("B")).add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })), - false, + TestStage::new(StageId("B")) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) .set_max_block(Some(10)) .run(db) @@ -449,12 +417,12 @@ mod tests { PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("A"), - result: ExecOutput { stage_progress: 20, done: true, reached_tip: true }, + result: ExecOutput { stage_progress: 20, done: true }, }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("B"), - result: ExecOutput { stage_progress: 10, done: true, reached_tip: true }, + result: ExecOutput { stage_progress: 10, done: true }, }, ] ); @@ -471,23 +439,13 @@ mod tests { let mut pipeline = Pipeline::>::new() .push( TestStage::new(StageId("A")) - .add_exec(Ok(ExecOutput { - stage_progress: 100, - done: true, - reached_tip: true, - })) + .add_exec(Ok(ExecOutput { stage_progress: 100, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), - false, ) .push( TestStage::new(StageId("B")) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), - false, ) .set_max_block(Some(10)); @@ -544,18 +502,9 @@ mod tests { Pipeline::>::new() .push( TestStage::new(StageId("A")) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 0 })) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })), - false, + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) .push( TestStage::new(StageId("B")) @@ -564,12 +513,7 @@ mod tests { error: consensus::Error::BaseFeeMissing, })) .add_unwind(Ok(UnwindOutput { stage_progress: 0 })) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })), - false, + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) .set_max_block(Some(10)) .set_channel(tx) @@ -585,7 +529,7 @@ mod tests { PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("A"), - result: ExecOutput { stage_progress: 10, done: true, reached_tip: true }, + result: ExecOutput { stage_progress: 10, done: true }, }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Error { stage_id: StageId("B") }, @@ -600,12 +544,12 @@ mod tests { PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0) }, PipelineEvent::Ran { stage_id: StageId("A"), - result: ExecOutput { stage_progress: 10, done: true, reached_tip: true }, + result: ExecOutput { stage_progress: 10, done: true }, }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("B"), - result: ExecOutput { stage_progress: 10, done: true, reached_tip: true }, + result: ExecOutput { stage_progress: 10, done: true }, }, ] ); @@ -631,35 +575,20 @@ mod tests { let mut pipeline = Pipeline::>::new() .push_with_unwind_priority( TestStage::new(StageId("A")) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), - false, 1, ) .push_with_unwind_priority( TestStage::new(StageId("B")) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), - false, 10, ) .push_with_unwind_priority( TestStage::new(StageId("C")) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), - false, 5, ) .set_max_block(Some(10)); @@ -703,66 +632,6 @@ mod tests { ); } - /// Runs a simple pipeline. - #[tokio::test] - async fn skips_stages_that_require_tip() { - let (tx, rx) = channel(2); - let db = test_utils::create_test_db(EnvKind::RW); - - // Run pipeline - tokio::spawn(async move { - Pipeline::>::new_with_channel(tx) - .push( - TestStage::new(StageId("A")) - .add_exec(Ok(ExecOutput { - stage_progress: 5, - done: true, - reached_tip: false, - })) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })), - false, - ) - .push( - TestStage::new(StageId("B")).add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })), - true, - ) - .set_max_block(Some(10)) - .run(db) - .await - }); - - // Check that the stages were run in order - assert_eq!( - ReceiverStream::new(rx).collect::>().await, - vec![ - PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, - PipelineEvent::Ran { - stage_id: StageId("A"), - result: ExecOutput { stage_progress: 5, reached_tip: false, done: true } - }, - PipelineEvent::Skipped { stage_id: StageId("B") }, - PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(5) }, - PipelineEvent::Ran { - stage_id: StageId("A"), - result: ExecOutput { stage_progress: 10, reached_tip: true, done: true } - }, - PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, - PipelineEvent::Ran { - stage_id: StageId("B"), - result: ExecOutput { stage_progress: 10, reached_tip: true, done: true } - }, - ] - ); - } - /// Checks that the pipeline re-runs stages on non-fatal errors and stops on fatal ones. #[tokio::test] async fn pipeline_error_handling() { @@ -772,8 +641,7 @@ mod tests { .push( TestStage::new(StageId("NonFatal")) .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) - .add_exec(Ok(ExecOutput { stage_progress: 10, done: true, reached_tip: true })), - false, + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) .set_max_block(Some(10)) .run(db) @@ -783,12 +651,9 @@ mod tests { // Fatal let db = test_utils::create_test_db(EnvKind::RW); let result = Pipeline::>::new() - .push( - TestStage::new(StageId("Fatal")).add_exec(Err(StageError::DatabaseIntegrity( - DatabaseIntegrityError::BlockBody { number: 5 }, - ))), - false, - ) + .push(TestStage::new(StageId("Fatal")).add_exec(Err(StageError::DatabaseIntegrity( + DatabaseIntegrityError::BlockBody { number: 5 }, + )))) .run(db) .await; assert_matches!( diff --git a/crates/stages/src/pipeline/state.rs b/crates/stages/src/pipeline/state.rs index 89154c0d8..aa393a925 100644 --- a/crates/stages/src/pipeline/state.rs +++ b/crates/stages/src/pipeline/state.rs @@ -12,11 +12,6 @@ pub(crate) struct PipelineState { pub(crate) maximum_progress: Option, /// The minimum progress achieved by any stage during the execution of the pipeline. pub(crate) minimum_progress: Option, - /// Whether or not the previous stage reached the tip of the chain. - /// - /// **Do not use this** under normal circumstances. Instead, opt for - /// [PipelineState::reached_tip] and [PipelineState::set_reached_tip]. - pub(crate) reached_tip: bool, } impl PipelineState { @@ -27,49 +22,12 @@ impl PipelineState { self.minimum_progress = opt::min(self.minimum_progress, stage_progress); self.maximum_progress = opt::max(self.maximum_progress, stage_progress); } - - /// Whether or not the pipeline reached the tip of the chain. - pub(crate) fn reached_tip(&self) -> bool { - self.reached_tip || - self.max_block - .zip(self.minimum_progress) - .map_or(false, |(target, progress)| progress >= target) - } - - /// Set whether or not the pipeline has reached the tip of the chain. - pub(crate) fn set_reached_tip(&mut self, flag: bool) { - self.reached_tip = flag; - } } #[cfg(test)] mod tests { use super::*; - #[test] - fn reached_tip() { - let mut state = PipelineState { - events_sender: MaybeSender::new(None), - max_block: None, - maximum_progress: None, - minimum_progress: None, - reached_tip: false, - }; - - // default - assert!(!state.reached_tip()); - - // reached tip - state.set_reached_tip(true); - assert!(state.reached_tip()); - - // reached max block - state.set_reached_tip(false); - state.max_block = Some(1); - state.minimum_progress = Some(1); - assert!(state.reached_tip()); - } - #[test] fn record_progress_outliers() { let mut state = PipelineState { @@ -77,7 +35,6 @@ mod tests { max_block: None, maximum_progress: None, minimum_progress: None, - reached_tip: false, }; state.record_progress_outliers(10); diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 7ba1e03ee..7534dd250 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -37,8 +37,6 @@ pub struct ExecOutput { pub stage_progress: BlockNumber, /// Whether or not the stage is done. pub done: bool, - /// Whether or not the stage reached the tip of the chain. - pub reached_tip: bool, } /// The output of a stage unwinding. diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index d4d598ae5..cd53a50b9 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -93,7 +93,7 @@ impl Stage for BodyStage(tx, starting_block, target)?; @@ -124,7 +124,6 @@ impl Stage for BodyStage Stage for BodyStage= 10 + Ok(ExecOutput { stage_progress, done: false }) if stage_progress >= 10 ); let first_run_progress = first_run.unwrap().stage_progress; @@ -392,7 +388,7 @@ mod tests { let output = rx.await.unwrap(); assert_matches!( output, - Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) if stage_progress > first_run_progress + Ok(ExecOutput { stage_progress, done: true }) if stage_progress > first_run_progress ); assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation"); } @@ -432,7 +428,7 @@ mod tests { // Check that the error bubbles up assert_matches!( rx.await.unwrap(), - Ok(ExecOutput { stage_progress: out_stage_progress, done: false, reached_tip: false }) + Ok(ExecOutput { stage_progress: out_stage_progress, done: false }) if out_stage_progress == stage_progress ); assert!(runner.validate_execution(input, None).is_ok(), "execution validation"); @@ -462,7 +458,7 @@ mod tests { let output = rx.await.unwrap(); assert_matches!( output, - Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) if stage_progress == previous_stage + Ok(ExecOutput { stage_progress, done: true }) if stage_progress == previous_stage ); let stage_progress = output.unwrap().stage_progress; runner @@ -521,7 +517,7 @@ mod tests { // Check that the error bubbles up assert_matches!( rx.await.unwrap(), - Ok(ExecOutput { stage_progress: out_stage_progress, done: false, reached_tip: false }) + Ok(ExecOutput { stage_progress: out_stage_progress, done: false }) if out_stage_progress == stage_progress ); assert!(runner.validate_execution(input, None).is_ok(), "execution validation"); diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 092153b36..ced9cebca 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -108,7 +108,7 @@ impl Stage for ExecutionStage { // no more canonical blocks, we are done with execution. if canonical_batch.is_empty() { - return Ok(ExecOutput { stage_progress: last_block, done: true, reached_tip: true }) + return Ok(ExecOutput { stage_progress: last_block, done: true }) } // Get block headers and bodies from canonical hashes @@ -262,7 +262,7 @@ impl Stage for ExecutionStage { let last_block = last_block + canonical_batch.len() as u64; let is_done = canonical_batch.len() < BATCH_SIZE as usize; - Ok(ExecOutput { done: is_done, reached_tip: true, stage_progress: last_block }) + Ok(ExecOutput { done: is_done, stage_progress: last_block }) } /// Unwind the stage. @@ -414,7 +414,7 @@ mod tests { execution_stage.config.spec_upgrades = SpecUpgrades::new_berlin_activated(); let output = execution_stage.execute(&mut tx, input).await.unwrap(); tx.commit().unwrap(); - assert_eq!(output, ExecOutput { stage_progress: 1, done: true, reached_tip: true }); + assert_eq!(output, ExecOutput { stage_progress: 1, done: true }); let tx = tx.deref_mut(); // check post state let account1 = H160(hex!("1000000000000000000000000000000000000000")); diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 43086d3ea..796d95966 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -142,7 +142,7 @@ impl Stage for SendersStage { let max_block_num = previous_stage_progress.min(stage_progress + self.commit_threshold); if max_block_num <= stage_progress { - return Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) + return Ok(ExecOutput { stage_progress, done: true }) } // Look up the start index for the transaction range @@ -74,7 +74,7 @@ impl Stage for SendersStage { // No transactions to walk over if start_tx_index > end_tx_index { - return Ok(ExecOutput { stage_progress: max_block_num, done: true, reached_tip: true }) + return Ok(ExecOutput { stage_progress: max_block_num, done: true }) } // Acquire the cursor for inserting elements @@ -106,7 +106,7 @@ impl Stage for SendersStage { } let done = max_block_num >= previous_stage_progress; - Ok(ExecOutput { stage_progress: max_block_num, done, reached_tip: done }) + Ok(ExecOutput { stage_progress: max_block_num, done }) } /// Unwind the stage. @@ -168,8 +168,8 @@ mod tests { let result = rx.await.unwrap(); assert_matches!( result, - Ok(ExecOutput { done, reached_tip, stage_progress }) - if done && reached_tip && stage_progress == previous_stage + Ok(ExecOutput { done, stage_progress }) + if done && stage_progress == previous_stage ); // Validate the stage execution @@ -196,7 +196,7 @@ mod tests { let expected_progress = stage_progress + threshold; assert_matches!( result, - Ok(ExecOutput { done: false, reached_tip: false, stage_progress }) + Ok(ExecOutput { done: false, stage_progress }) if stage_progress == expected_progress ); @@ -208,7 +208,7 @@ mod tests { let result = runner.execute(second_input).await.unwrap(); assert_matches!( result, - Ok(ExecOutput { done: true, reached_tip: true, stage_progress }) + Ok(ExecOutput { done: true, stage_progress }) if stage_progress == previous_stage ); diff --git a/crates/stages/src/test_utils/macros.rs b/crates/stages/src/test_utils/macros.rs index 4a25ee186..b282e9b9e 100644 --- a/crates/stages/src/test_utils/macros.rs +++ b/crates/stages/src/test_utils/macros.rs @@ -40,8 +40,8 @@ macro_rules! stage_test_suite { let result = rx.await.unwrap(); assert_matches::assert_matches!( result, - Ok(ExecOutput { done, reached_tip, stage_progress }) - if done && reached_tip && stage_progress == previous_stage + Ok(ExecOutput { done, stage_progress }) + if done && stage_progress == previous_stage ); // Validate the stage execution @@ -90,8 +90,8 @@ macro_rules! stage_test_suite { let result = rx.await.unwrap(); assert_matches::assert_matches!( result, - Ok(ExecOutput { done, reached_tip, stage_progress }) - if done && reached_tip && stage_progress == previous_stage + Ok(ExecOutput { done, stage_progress }) + if done && stage_progress == previous_stage ); assert!(runner.validate_execution(execute_input, result.ok()).is_ok(), "execution validation"); @@ -142,8 +142,8 @@ macro_rules! stage_test_suite_ext { let result = rx.await.unwrap(); assert_matches::assert_matches!( result, - Ok(ExecOutput { done, reached_tip, stage_progress }) - if done && reached_tip && stage_progress == stage_progress + Ok(ExecOutput { done, stage_progress }) + if done && stage_progress == stage_progress ); // Validate the stage execution