From a759201b40217af147e069498a7d7db869439510 Mon Sep 17 00:00:00 2001 From: Bjerg Date: Mon, 10 Oct 2022 15:52:23 +0200 Subject: [PATCH] refactor(stages): split `Pipeline::run` (#30) --- crates/stages/src/pipeline.rs | 286 +++++++++++++++------------- crates/stages/src/pipeline/ctrl.rs | 14 ++ crates/stages/src/pipeline/state.rs | 43 +++++ crates/stages/src/util.rs | 80 +++++++- 4 files changed, 286 insertions(+), 137 deletions(-) create mode 100644 crates/stages/src/pipeline/ctrl.rs create mode 100644 crates/stages/src/pipeline/state.rs diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index f1c0c7b36..a069cbebd 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -1,7 +1,7 @@ use crate::{ error::*, - util::opt::{self, MaybeSender}, - ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput, + util::{db::TxContainer, opt::MaybeSender}, + ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, }; use reth_db::mdbx; use reth_primitives::BlockNumber; @@ -9,21 +9,13 @@ use std::fmt::{Debug, Formatter}; use tokio::sync::mpsc::Sender; use tracing::*; +mod ctrl; mod event; -pub use event::*; +mod state; -struct QueuedStage<'db, E> -where - E: mdbx::EnvironmentKind, -{ - /// The actual stage to execute. - stage: Box>, - /// The unwind priority of the stage. - unwind_priority: usize, - /// Whether or not this stage can only execute when we reach what we believe to be the tip of - /// the chain. - require_tip: bool, -} +use ctrl::*; +pub use event::*; +use state::*; /// A staged sync pipeline. /// @@ -126,111 +118,70 @@ where /// Run the pipeline. pub async fn run(&mut self, db: &'db mdbx::Environment) -> Result<(), PipelineError> { - let mut previous_stage = None; - let mut minimum_progress: Option = None; - let mut maximum_progress: Option = None; - let mut reached_tip_flag = true; + let mut state = PipelineState { + events_sender: self.events_sender.clone(), + max_block: self.max_block, + maximum_progress: None, + minimum_progress: None, + reached_tip: true, + }; - 'run: loop { - let mut tx = db.begin_rw_txn()?; - for (_, queued_stage) in self.stages.iter_mut().enumerate() { - let stage_id = queued_stage.stage.id(); - let block_reached = loop { - let prev_progress = stage_id.get_progress(&tx)?; - self.events_sender - .send(PipelineEvent::Running { stage_id, stage_progress: prev_progress }) - .await?; + loop { + let mut tx = TxContainer::new(db)?; + match self.run_loop(&mut state, &mut tx).await? { + ControlFlow::Continue => { + tx.commit()?; - // Whether any stage has reached the maximum block, which also counts as having - // reached the tip for stages that have reached the tip - let reached_max_block = maximum_progress + // Check if we've reached our desired target block + if state + .minimum_progress .zip(self.max_block) - .map_or(false, |(progress, target)| progress >= target); - - // Whether this stage reached the max block - let stage_reached_max_block = prev_progress - .zip(self.max_block) - .map_or(false, |(prev_progress, target)| prev_progress >= target); - - // Execute stage - let output = Self::execute_stage( - &mut tx, - queued_stage, - ExecInput { previous_stage, stage_progress: prev_progress }, - reached_tip_flag || reached_max_block, - stage_reached_max_block, - ) - .instrument(info_span!("Running", stage = %stage_id)) - .await; - - if output.is_err() { - self.events_sender - .send(PipelineEvent::Ran { stage_id, result: None }) - .await?; + .map_or(false, |(progress, target)| progress >= target) + { + return Ok(()) } - - match output { - Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => { - debug!(stage = %stage_id, %stage_progress, %done, "Stage made progress"); - stage_id.save_progress(&tx, stage_progress)?; - - self.events_sender - .send(PipelineEvent::Ran { stage_id, result: Some(out.clone()) }) - .await?; - - // TODO: Make the commit interval configurable - tx.commit()?; - tx = db.begin_rw_txn()?; - - // Update our minimum and maximum stage progress - minimum_progress = opt::min(minimum_progress, stage_progress); - maximum_progress = opt::max(maximum_progress, stage_progress); - - if done { - reached_tip_flag = reached_tip; - break stage_progress - } - } - Err(StageError::Validation { block }) => { - debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error."); - - // We unwind because of a validation error. If the unwind itself fails, - // we bail entirely, otherwise we restart the execution loop from the - // beginning. - // - // Note on the drop: The transaction needs to be dropped in order for - // unwind to create a new one. Dropping the - // transaction will abort it; there is no - // other way currently to abort the transaction. It will be re-created - // if the loop restarts. - drop(tx); - match self - .unwind(db, prev_progress.unwrap_or_default(), Some(block)) - .await - { - Ok(()) => continue 'run, - Err(e) => return Err(e), - } - } - Err(e) => return Err(PipelineError::Stage(e)), - } - }; - - // Set previous stage and continue on to next stage. - previous_stage = Some((stage_id, block_reached)); - } - tx.commit()?; - - // Check if we've reached our desired target block - if minimum_progress - .zip(self.max_block) - .map_or(false, |(progress, target)| progress >= target) - { - return Ok(()) + } + ControlFlow::Unwind { .. } => (), } } } + /// Run a single stage loop. + /// + /// A stage loop will run each stage serially. + async fn run_loop<'tx>( + &mut self, + state: &mut PipelineState, + tx: &mut TxContainer<'db, 'tx, E>, + ) -> Result + where + 'db: 'tx, + { + let mut previous_stage = None; + for (_, queued_stage) in self.stages.iter_mut().enumerate() { + let stage_id = queued_stage.stage.id(); + match queued_stage + .execute(state, previous_stage, tx) + .instrument(info_span!("Running", stage = %stage_id)) + .await? + { + ControlFlow::Continue => { + previous_stage = + Some((stage_id, stage_id.get_progress(tx.get())?.unwrap_or_default())); + } + ControlFlow::Unwind { target, bad_block } => { + // TODO: Note on close + tx.close(); + self.unwind(tx.db, target, bad_block).await?; + tx.open()?; + return Ok(ControlFlow::Unwind { target, bad_block }) + } + } + } + + Ok(ControlFlow::Continue) + } + /// Unwind the stages to the target block. /// /// If the unwind is due to a bad block the number of that block should be specified. @@ -311,40 +262,103 @@ where } } -impl<'db, E> Pipeline<'db, E> +/// A container for a queued stage. +struct QueuedStage<'db, E> where E: mdbx::EnvironmentKind, { - async fn execute_stage<'tx>( - tx: &mut mdbx::Transaction<'tx, mdbx::RW, E>, - QueuedStage { stage, require_tip, .. }: &mut QueuedStage<'db, E>, - input: ExecInput, - reached_tip: bool, - stage_reached_max_block: bool, - ) -> Result + /// The actual stage to execute. + stage: Box>, + /// The unwind priority of the stage. + unwind_priority: usize, + /// Whether or not this stage can only execute when we reach what we believe to be the tip of + /// the chain. + require_tip: bool, +} + +impl<'db, E> QueuedStage<'db, E> +where + E: mdbx::EnvironmentKind, +{ + /// Execute the stage. + async fn execute<'tx>( + &mut self, + state: &mut PipelineState, + previous_stage: Option<(StageId, BlockNumber)>, + tx: &mut TxContainer<'db, 'tx, E>, + ) -> Result where 'db: 'tx, { - if !reached_tip && *require_tip { + let stage_id = self.stage.id(); + if self.require_tip && !state.reached_tip() { info!("Tip not reached, skipping."); // Stage requires us to reach the tip of the chain first, but we have // not. - Ok(ExecOutput { - stage_progress: input.stage_progress.unwrap_or_default(), - done: true, - reached_tip: false, - }) - } else if stage_reached_max_block { - info!("Stage reached maximum block, skipping."); - // We reached the maximum block, so we skip the stage - Ok(ExecOutput { - stage_progress: input.stage_progress.unwrap_or_default(), - done: true, - reached_tip: true, - }) - } else { - stage.execute(tx, input).await + return Ok(ControlFlow::Continue) + } + + loop { + let prev_progress = stage_id.get_progress(tx.get())?; + state + .events_sender + .send(PipelineEvent::Running { stage_id, stage_progress: prev_progress }) + .await?; + + let stage_reached_max_block = prev_progress + .zip(state.max_block) + .map_or(false, |(prev_progress, target)| prev_progress >= target); + if stage_reached_max_block { + info!("Stage reached maximum block, skipping."); + + // We reached the maximum block, so we skip the stage + state.set_reached_tip(true); + return Ok(ControlFlow::Continue) + } + + match self + .stage + .execute(tx.get_mut(), ExecInput { previous_stage, stage_progress: prev_progress }) + .await + { + Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => { + debug!(stage = %stage_id, %stage_progress, %done, "Stage made progress"); + stage_id.save_progress(tx.get_mut(), stage_progress)?; + + state + .events_sender + .send(PipelineEvent::Ran { stage_id, result: Some(out.clone()) }) + .await?; + + // TODO: Make the commit interval configurable + tx.commit()?; + + state.record_progress_outliers(stage_progress); + state.set_reached_tip(reached_tip); + + if done { + return Ok(ControlFlow::Continue) + } + } + Err(err) => { + state.events_sender.send(PipelineEvent::Ran { stage_id, result: None }).await?; + + return if let StageError::Validation { block } = err { + debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error."); + + // We unwind because of a validation error. If the unwind itself fails, + // we bail entirely, otherwise we restart the execution loop from the + // beginning. + Ok(ControlFlow::Unwind { + target: prev_progress.unwrap_or_default(), + bad_block: Some(block), + }) + } else { + Err(err.into()) + } + } + } } } } diff --git a/crates/stages/src/pipeline/ctrl.rs b/crates/stages/src/pipeline/ctrl.rs new file mode 100644 index 000000000..819d5d0a6 --- /dev/null +++ b/crates/stages/src/pipeline/ctrl.rs @@ -0,0 +1,14 @@ +use reth_primitives::BlockNumber; + +/// Determines the control flow during pipeline execution. +pub(crate) enum ControlFlow { + /// An unwind was requested and must be performed before continuing. + Unwind { + /// The block to unwind to. + target: BlockNumber, + /// The block that caused the unwind. + bad_block: Option, + }, + /// The pipeline is allowed to continue executing stages. + Continue, +} diff --git a/crates/stages/src/pipeline/state.rs b/crates/stages/src/pipeline/state.rs new file mode 100644 index 000000000..7d4915f2c --- /dev/null +++ b/crates/stages/src/pipeline/state.rs @@ -0,0 +1,43 @@ +use crate::{ + pipeline::event::PipelineEvent, + util::{opt, opt::MaybeSender}, +}; +use reth_primitives::BlockNumber; + +/// The state of the pipeline during execution. +pub(crate) struct PipelineState { + pub(crate) events_sender: MaybeSender, + pub(crate) max_block: Option, + /// The maximum progress achieved by any stage during the execution of the pipeline. + pub(crate) maximum_progress: Option, + /// The minimum progress achieved by any stage during the execution of the pipeline. + pub(crate) minimum_progress: Option, + /// Whether or not the previous stage reached the tip of the chain. + /// + /// **Do not use this** under normal circumstances. Instead, opt for + /// [PipelineState::reached_tip] and [PipelineState::set_reached_tip]. + pub(crate) reached_tip: bool, +} + +impl PipelineState { + /// Record the progress of a stage, setting the maximum and minimum progress achieved by any + /// stage during the execution of the pipeline. + pub(crate) fn record_progress_outliers(&mut self, stage_progress: BlockNumber) { + // Update our minimum and maximum stage progress + self.minimum_progress = opt::min(self.minimum_progress, stage_progress); + self.maximum_progress = opt::max(self.maximum_progress, stage_progress); + } + + /// Whether or not the pipeline reached the tip of the chain. + pub(crate) fn reached_tip(&self) -> bool { + self.reached_tip || + self.max_block + .zip(self.minimum_progress) + .map_or(false, |(target, progress)| progress >= target) + } + + /// Set whether or not the pipeline has reached the tip of the chain. + pub(crate) fn set_reached_tip(&mut self, flag: bool) { + self.reached_tip = flag; + } +} diff --git a/crates/stages/src/util.rs b/crates/stages/src/util.rs index 9cdfd2010..e49e39f3a 100644 --- a/crates/stages/src/util.rs +++ b/crates/stages/src/util.rs @@ -16,7 +16,7 @@ pub(crate) mod opt { } /// The producing side of a [tokio::mpsc] channel that may or may not be set. - #[derive(Default)] + #[derive(Default, Clone)] pub(crate) struct MaybeSender { inner: Option>, } @@ -61,3 +61,81 @@ pub(crate) mod opt { } } } + +pub(crate) mod db { + use reth_db::mdbx; + + /// A container for a MDBX transaction that will open a new inner transaction when the current + /// one is committed. + // NOTE: This container is needed since `Transaction::commit` takes `mut self`, so methods in + // the pipeline that just take a reference will not be able to commit their transaction and let + // the pipeline continue. Is there a better way to do this? + pub(crate) struct TxContainer<'db, 'tx, E> + where + 'db: 'tx, + E: mdbx::EnvironmentKind, + { + /// A handle to the MDBX database. + pub(crate) db: &'db mdbx::Environment, + tx: Option>, + } + + impl<'db, 'tx, E> TxContainer<'db, 'tx, E> + where + 'db: 'tx, + E: mdbx::EnvironmentKind, + { + /// Create a new container with the given database handle. + /// + /// A new inner transaction will be opened. + pub(crate) fn new(db: &'db mdbx::Environment) -> Result { + Ok(Self { db, tx: Some(db.begin_rw_txn()?) }) + } + + /// Commit the current inner transaction and open a new one. + /// + /// # Panics + /// + /// Panics if an inner transaction does not exist. This should never be the case unless + /// [TxContainer::close] was called without following up with a call to [TxContainer::open]. + pub(crate) fn commit(&mut self) -> Result { + let success = + self.tx.take().expect("Tried committing a non-existent transaction").commit()?; + self.tx = Some(self.db.begin_rw_txn()?); + Ok(success) + } + + /// Get the inner transaction. + /// + /// # Panics + /// + /// Panics if an inner transaction does not exist. This should never be the case unless + /// [TxContainer::close] was called without following up with a call to [TxContainer::open]. + pub(crate) fn get(&self) -> &mdbx::Transaction<'tx, mdbx::RW, E> { + self.tx.as_ref().expect("Tried getting a reference to a non-existent transaction") + } + + /// Get a mutable reference to the inner transaction. + /// + /// # Panics + /// + /// Panics if an inner transaction does not exist. This should never be the case unless + /// [TxContainer::close] was called without following up with a call to [TxContainer::open]. + pub(crate) fn get_mut(&mut self) -> &mut mdbx::Transaction<'tx, mdbx::RW, E> { + self.tx + .as_mut() + .expect("Tried getting a mutable reference to a non-existent transaction") + } + + /// Open a new inner transaction. + pub(crate) fn open(&mut self) -> Result<(), mdbx::Error> { + self.tx = Some(self.db.begin_rw_txn()?); + Ok(()) + } + + /// Close the current inner transaction. + pub(crate) fn close(&mut self) { + self.tx.take(); + } + } +}