fix: fail on unwind during reth import (#12062)

This commit is contained in:
Arsenii Kulikov
2024-10-25 16:10:20 +04:00
committed by GitHub
parent a87d654c55
commit 58441c158b
5 changed files with 30 additions and 2 deletions

View File

@ -207,6 +207,7 @@ where
.with_tip_sender(tip_tx) .with_tip_sender(tip_tx)
// we want to sync all blocks the file client provides or 0 if empty // we want to sync all blocks the file client provides or 0 if empty
.with_max_block(max_block) .with_max_block(max_block)
.with_fail_on_unwind(true)
.add_stages( .add_stages(
DefaultStages::new( DefaultStages::new(
provider_factory.clone(), provider_factory.clone(),

View File

@ -75,6 +75,7 @@ where
.with_tip_sender(tip_tx) .with_tip_sender(tip_tx)
// we want to sync all blocks the file client provides or 0 if empty // we want to sync all blocks the file client provides or 0 if empty
.with_max_block(max_block) .with_max_block(max_block)
.with_fail_on_unwind(true)
.add_stages( .add_stages(
DefaultStages::new( DefaultStages::new(
provider_factory.clone(), provider_factory.clone(),

View File

@ -188,4 +188,7 @@ pub enum PipelineError {
/// Internal error /// Internal error
#[error(transparent)] #[error(transparent)]
Internal(#[from] RethError), Internal(#[from] RethError),
/// The pipeline encountered an unwind when `fail_on_unwind` was set to `true`.
#[error("unexpected unwind")]
UnexpectedUnwind,
} }

View File

@ -14,6 +14,7 @@ pub struct PipelineBuilder<Provider> {
/// A receiver for the current chain tip to sync to. /// A receiver for the current chain tip to sync to.
tip_tx: Option<watch::Sender<B256>>, tip_tx: Option<watch::Sender<B256>>,
metrics_tx: Option<MetricEventsSender>, metrics_tx: Option<MetricEventsSender>,
fail_on_unwind: bool,
} }
impl<Provider> PipelineBuilder<Provider> { impl<Provider> PipelineBuilder<Provider> {
@ -62,6 +63,12 @@ impl<Provider> PipelineBuilder<Provider> {
self self
} }
/// Set whether pipeline should fail on unwind.
pub const fn with_fail_on_unwind(mut self, yes: bool) -> Self {
self.fail_on_unwind = yes;
self
}
/// Builds the final [`Pipeline`] using the given database. /// Builds the final [`Pipeline`] using the given database.
pub fn build<N>( pub fn build<N>(
self, self,
@ -72,7 +79,7 @@ impl<Provider> PipelineBuilder<Provider> {
N: ProviderNodeTypes, N: ProviderNodeTypes,
ProviderFactory<N>: DatabaseProviderFactory<ProviderRW = Provider>, ProviderFactory<N>: DatabaseProviderFactory<ProviderRW = Provider>,
{ {
let Self { stages, max_block, tip_tx, metrics_tx } = self; let Self { stages, max_block, tip_tx, metrics_tx, fail_on_unwind } = self;
Pipeline { Pipeline {
provider_factory, provider_factory,
stages, stages,
@ -82,13 +89,20 @@ impl<Provider> PipelineBuilder<Provider> {
event_sender: Default::default(), event_sender: Default::default(),
progress: Default::default(), progress: Default::default(),
metrics_tx, metrics_tx,
fail_on_unwind,
} }
} }
} }
impl<Provider> Default for PipelineBuilder<Provider> { impl<Provider> Default for PipelineBuilder<Provider> {
fn default() -> Self { fn default() -> Self {
Self { stages: Vec::new(), max_block: None, tip_tx: None, metrics_tx: None } Self {
stages: Vec::new(),
max_block: None,
tip_tx: None,
metrics_tx: None,
fail_on_unwind: false,
}
} }
} }
@ -97,6 +111,7 @@ impl<Provider> std::fmt::Debug for PipelineBuilder<Provider> {
f.debug_struct("PipelineBuilder") f.debug_struct("PipelineBuilder")
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>()) .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
.field("max_block", &self.max_block) .field("max_block", &self.max_block)
.field("fail_on_unwind", &self.fail_on_unwind)
.finish() .finish()
} }
} }

View File

@ -78,6 +78,9 @@ pub struct Pipeline<N: ProviderNodeTypes> {
/// A receiver for the current chain tip to sync to. /// A receiver for the current chain tip to sync to.
tip_tx: Option<watch::Sender<B256>>, tip_tx: Option<watch::Sender<B256>>,
metrics_tx: Option<MetricEventsSender>, metrics_tx: Option<MetricEventsSender>,
/// Whether an unwind should fail the syncing process. Should only be set when downloading
/// blocks from trusted sources and expecting them to be valid.
fail_on_unwind: bool,
} }
impl<N: ProviderNodeTypes> Pipeline<N> { impl<N: ProviderNodeTypes> Pipeline<N> {
@ -164,6 +167,10 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
loop { loop {
let next_action = self.run_loop().await?; let next_action = self.run_loop().await?;
if next_action.is_unwind() && self.fail_on_unwind {
return Err(PipelineError::UnexpectedUnwind)
}
// Terminate the loop early if it's reached the maximum user // Terminate the loop early if it's reached the maximum user
// configured block. // configured block.
if next_action.should_continue() && if next_action.should_continue() &&
@ -586,6 +593,7 @@ impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>()) .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
.field("max_block", &self.max_block) .field("max_block", &self.max_block)
.field("event_sender", &self.event_sender) .field("event_sender", &self.event_sender)
.field("fail_on_unwind", &self.fail_on_unwind)
.finish() .finish()
} }
} }