From 58441c158b27986b904efccf6cbe5953d090e083 Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Fri, 25 Oct 2024 16:10:20 +0400 Subject: [PATCH] fix: fail on unwind during `reth import` (#12062) --- crates/cli/commands/src/import.rs | 1 + .../cli/src/commands/build_pipeline.rs | 1 + crates/stages/api/src/error.rs | 3 +++ crates/stages/api/src/pipeline/builder.rs | 19 +++++++++++++++++-- crates/stages/api/src/pipeline/mod.rs | 8 ++++++++ 5 files changed, 30 insertions(+), 2 deletions(-) diff --git a/crates/cli/commands/src/import.rs b/crates/cli/commands/src/import.rs index 6b750d32a..a7c81e530 100644 --- a/crates/cli/commands/src/import.rs +++ b/crates/cli/commands/src/import.rs @@ -207,6 +207,7 @@ where .with_tip_sender(tip_tx) // we want to sync all blocks the file client provides or 0 if empty .with_max_block(max_block) + .with_fail_on_unwind(true) .add_stages( DefaultStages::new( provider_factory.clone(), diff --git a/crates/optimism/cli/src/commands/build_pipeline.rs b/crates/optimism/cli/src/commands/build_pipeline.rs index f23cb9a7c..a197f93a8 100644 --- a/crates/optimism/cli/src/commands/build_pipeline.rs +++ b/crates/optimism/cli/src/commands/build_pipeline.rs @@ -75,6 +75,7 @@ where .with_tip_sender(tip_tx) // we want to sync all blocks the file client provides or 0 if empty .with_max_block(max_block) + .with_fail_on_unwind(true) .add_stages( DefaultStages::new( provider_factory.clone(), diff --git a/crates/stages/api/src/error.rs b/crates/stages/api/src/error.rs index 68e1d00fd..8562b10b6 100644 --- a/crates/stages/api/src/error.rs +++ b/crates/stages/api/src/error.rs @@ -188,4 +188,7 @@ pub enum PipelineError { /// Internal error #[error(transparent)] Internal(#[from] RethError), + /// The pipeline encountered an unwind when `fail_on_unwind` was set to `true`. + #[error("unexpected unwind")] + UnexpectedUnwind, } diff --git a/crates/stages/api/src/pipeline/builder.rs b/crates/stages/api/src/pipeline/builder.rs index 79a4c477e..45bdc2d89 100644 --- a/crates/stages/api/src/pipeline/builder.rs +++ b/crates/stages/api/src/pipeline/builder.rs @@ -14,6 +14,7 @@ pub struct PipelineBuilder { /// A receiver for the current chain tip to sync to. tip_tx: Option>, metrics_tx: Option, + fail_on_unwind: bool, } impl PipelineBuilder { @@ -62,6 +63,12 @@ impl PipelineBuilder { self } + /// Set whether pipeline should fail on unwind. + pub const fn with_fail_on_unwind(mut self, yes: bool) -> Self { + self.fail_on_unwind = yes; + self + } + /// Builds the final [`Pipeline`] using the given database. pub fn build( self, @@ -72,7 +79,7 @@ impl PipelineBuilder { N: ProviderNodeTypes, ProviderFactory: DatabaseProviderFactory, { - let Self { stages, max_block, tip_tx, metrics_tx } = self; + let Self { stages, max_block, tip_tx, metrics_tx, fail_on_unwind } = self; Pipeline { provider_factory, stages, @@ -82,13 +89,20 @@ impl PipelineBuilder { event_sender: Default::default(), progress: Default::default(), metrics_tx, + fail_on_unwind, } } } impl Default for PipelineBuilder { fn default() -> Self { - Self { stages: Vec::new(), max_block: None, tip_tx: None, metrics_tx: None } + Self { + stages: Vec::new(), + max_block: None, + tip_tx: None, + metrics_tx: None, + fail_on_unwind: false, + } } } @@ -97,6 +111,7 @@ impl std::fmt::Debug for PipelineBuilder { f.debug_struct("PipelineBuilder") .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::>()) .field("max_block", &self.max_block) + .field("fail_on_unwind", &self.fail_on_unwind) .finish() } } diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 14225a595..399a3ffb4 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -78,6 +78,9 @@ pub struct Pipeline { /// A receiver for the current chain tip to sync to. tip_tx: Option>, metrics_tx: Option, + /// Whether an unwind should fail the syncing process. Should only be set when downloading + /// blocks from trusted sources and expecting them to be valid. + fail_on_unwind: bool, } impl Pipeline { @@ -164,6 +167,10 @@ impl Pipeline { loop { let next_action = self.run_loop().await?; + if next_action.is_unwind() && self.fail_on_unwind { + return Err(PipelineError::UnexpectedUnwind) + } + // Terminate the loop early if it's reached the maximum user // configured block. if next_action.should_continue() && @@ -586,6 +593,7 @@ impl std::fmt::Debug for Pipeline { .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::>()) .field("max_block", &self.max_block) .field("event_sender", &self.event_sender) + .field("fail_on_unwind", &self.fail_on_unwind) .finish() } }