mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
refactor(stages): split Pipeline::run (#30)
This commit is contained in:
@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
error::*,
|
||||
util::opt::{self, MaybeSender},
|
||||
ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput,
|
||||
util::{db::TxContainer, opt::MaybeSender},
|
||||
ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
|
||||
};
|
||||
use reth_db::mdbx;
|
||||
use reth_primitives::BlockNumber;
|
||||
@ -9,21 +9,13 @@ use std::fmt::{Debug, Formatter};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::*;
|
||||
|
||||
mod ctrl;
|
||||
mod event;
|
||||
pub use event::*;
|
||||
mod state;
|
||||
|
||||
struct QueuedStage<'db, E>
|
||||
where
|
||||
E: mdbx::EnvironmentKind,
|
||||
{
|
||||
/// The actual stage to execute.
|
||||
stage: Box<dyn Stage<'db, E>>,
|
||||
/// The unwind priority of the stage.
|
||||
unwind_priority: usize,
|
||||
/// Whether or not this stage can only execute when we reach what we believe to be the tip of
|
||||
/// the chain.
|
||||
require_tip: bool,
|
||||
}
|
||||
use ctrl::*;
|
||||
pub use event::*;
|
||||
use state::*;
|
||||
|
||||
/// A staged sync pipeline.
|
||||
///
|
||||
@ -126,111 +118,70 @@ where
|
||||
|
||||
/// Run the pipeline.
|
||||
pub async fn run(&mut self, db: &'db mdbx::Environment<E>) -> Result<(), PipelineError> {
|
||||
let mut previous_stage = None;
|
||||
let mut minimum_progress: Option<BlockNumber> = None;
|
||||
let mut maximum_progress: Option<BlockNumber> = None;
|
||||
let mut reached_tip_flag = true;
|
||||
let mut state = PipelineState {
|
||||
events_sender: self.events_sender.clone(),
|
||||
max_block: self.max_block,
|
||||
maximum_progress: None,
|
||||
minimum_progress: None,
|
||||
reached_tip: true,
|
||||
};
|
||||
|
||||
'run: loop {
|
||||
let mut tx = db.begin_rw_txn()?;
|
||||
for (_, queued_stage) in self.stages.iter_mut().enumerate() {
|
||||
let stage_id = queued_stage.stage.id();
|
||||
let block_reached = loop {
|
||||
let prev_progress = stage_id.get_progress(&tx)?;
|
||||
self.events_sender
|
||||
.send(PipelineEvent::Running { stage_id, stage_progress: prev_progress })
|
||||
.await?;
|
||||
loop {
|
||||
let mut tx = TxContainer::new(db)?;
|
||||
match self.run_loop(&mut state, &mut tx).await? {
|
||||
ControlFlow::Continue => {
|
||||
tx.commit()?;
|
||||
|
||||
// Whether any stage has reached the maximum block, which also counts as having
|
||||
// reached the tip for stages that have reached the tip
|
||||
let reached_max_block = maximum_progress
|
||||
// Check if we've reached our desired target block
|
||||
if state
|
||||
.minimum_progress
|
||||
.zip(self.max_block)
|
||||
.map_or(false, |(progress, target)| progress >= target);
|
||||
|
||||
// Whether this stage reached the max block
|
||||
let stage_reached_max_block = prev_progress
|
||||
.zip(self.max_block)
|
||||
.map_or(false, |(prev_progress, target)| prev_progress >= target);
|
||||
|
||||
// Execute stage
|
||||
let output = Self::execute_stage(
|
||||
&mut tx,
|
||||
queued_stage,
|
||||
ExecInput { previous_stage, stage_progress: prev_progress },
|
||||
reached_tip_flag || reached_max_block,
|
||||
stage_reached_max_block,
|
||||
)
|
||||
.instrument(info_span!("Running", stage = %stage_id))
|
||||
.await;
|
||||
|
||||
if output.is_err() {
|
||||
self.events_sender
|
||||
.send(PipelineEvent::Ran { stage_id, result: None })
|
||||
.await?;
|
||||
.map_or(false, |(progress, target)| progress >= target)
|
||||
{
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
match output {
|
||||
Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => {
|
||||
debug!(stage = %stage_id, %stage_progress, %done, "Stage made progress");
|
||||
stage_id.save_progress(&tx, stage_progress)?;
|
||||
|
||||
self.events_sender
|
||||
.send(PipelineEvent::Ran { stage_id, result: Some(out.clone()) })
|
||||
.await?;
|
||||
|
||||
// TODO: Make the commit interval configurable
|
||||
tx.commit()?;
|
||||
tx = db.begin_rw_txn()?;
|
||||
|
||||
// Update our minimum and maximum stage progress
|
||||
minimum_progress = opt::min(minimum_progress, stage_progress);
|
||||
maximum_progress = opt::max(maximum_progress, stage_progress);
|
||||
|
||||
if done {
|
||||
reached_tip_flag = reached_tip;
|
||||
break stage_progress
|
||||
}
|
||||
}
|
||||
Err(StageError::Validation { block }) => {
|
||||
debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error.");
|
||||
|
||||
// We unwind because of a validation error. If the unwind itself fails,
|
||||
// we bail entirely, otherwise we restart the execution loop from the
|
||||
// beginning.
|
||||
//
|
||||
// Note on the drop: The transaction needs to be dropped in order for
|
||||
// unwind to create a new one. Dropping the
|
||||
// transaction will abort it; there is no
|
||||
// other way currently to abort the transaction. It will be re-created
|
||||
// if the loop restarts.
|
||||
drop(tx);
|
||||
match self
|
||||
.unwind(db, prev_progress.unwrap_or_default(), Some(block))
|
||||
.await
|
||||
{
|
||||
Ok(()) => continue 'run,
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(PipelineError::Stage(e)),
|
||||
}
|
||||
};
|
||||
|
||||
// Set previous stage and continue on to next stage.
|
||||
previous_stage = Some((stage_id, block_reached));
|
||||
}
|
||||
tx.commit()?;
|
||||
|
||||
// Check if we've reached our desired target block
|
||||
if minimum_progress
|
||||
.zip(self.max_block)
|
||||
.map_or(false, |(progress, target)| progress >= target)
|
||||
{
|
||||
return Ok(())
|
||||
}
|
||||
ControlFlow::Unwind { .. } => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run a single stage loop.
|
||||
///
|
||||
/// A stage loop will run each stage serially.
|
||||
async fn run_loop<'tx>(
|
||||
&mut self,
|
||||
state: &mut PipelineState,
|
||||
tx: &mut TxContainer<'db, 'tx, E>,
|
||||
) -> Result<ControlFlow, PipelineError>
|
||||
where
|
||||
'db: 'tx,
|
||||
{
|
||||
let mut previous_stage = None;
|
||||
for (_, queued_stage) in self.stages.iter_mut().enumerate() {
|
||||
let stage_id = queued_stage.stage.id();
|
||||
match queued_stage
|
||||
.execute(state, previous_stage, tx)
|
||||
.instrument(info_span!("Running", stage = %stage_id))
|
||||
.await?
|
||||
{
|
||||
ControlFlow::Continue => {
|
||||
previous_stage =
|
||||
Some((stage_id, stage_id.get_progress(tx.get())?.unwrap_or_default()));
|
||||
}
|
||||
ControlFlow::Unwind { target, bad_block } => {
|
||||
// TODO: Note on close
|
||||
tx.close();
|
||||
self.unwind(tx.db, target, bad_block).await?;
|
||||
tx.open()?;
|
||||
return Ok(ControlFlow::Unwind { target, bad_block })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ControlFlow::Continue)
|
||||
}
|
||||
|
||||
/// Unwind the stages to the target block.
|
||||
///
|
||||
/// If the unwind is due to a bad block the number of that block should be specified.
|
||||
@ -311,40 +262,103 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<'db, E> Pipeline<'db, E>
|
||||
/// A container for a queued stage.
|
||||
struct QueuedStage<'db, E>
|
||||
where
|
||||
E: mdbx::EnvironmentKind,
|
||||
{
|
||||
async fn execute_stage<'tx>(
|
||||
tx: &mut mdbx::Transaction<'tx, mdbx::RW, E>,
|
||||
QueuedStage { stage, require_tip, .. }: &mut QueuedStage<'db, E>,
|
||||
input: ExecInput,
|
||||
reached_tip: bool,
|
||||
stage_reached_max_block: bool,
|
||||
) -> Result<ExecOutput, StageError>
|
||||
/// The actual stage to execute.
|
||||
stage: Box<dyn Stage<'db, E>>,
|
||||
/// The unwind priority of the stage.
|
||||
unwind_priority: usize,
|
||||
/// Whether or not this stage can only execute when we reach what we believe to be the tip of
|
||||
/// the chain.
|
||||
require_tip: bool,
|
||||
}
|
||||
|
||||
impl<'db, E> QueuedStage<'db, E>
|
||||
where
|
||||
E: mdbx::EnvironmentKind,
|
||||
{
|
||||
/// Execute the stage.
|
||||
async fn execute<'tx>(
|
||||
&mut self,
|
||||
state: &mut PipelineState,
|
||||
previous_stage: Option<(StageId, BlockNumber)>,
|
||||
tx: &mut TxContainer<'db, 'tx, E>,
|
||||
) -> Result<ControlFlow, PipelineError>
|
||||
where
|
||||
'db: 'tx,
|
||||
{
|
||||
if !reached_tip && *require_tip {
|
||||
let stage_id = self.stage.id();
|
||||
if self.require_tip && !state.reached_tip() {
|
||||
info!("Tip not reached, skipping.");
|
||||
|
||||
// Stage requires us to reach the tip of the chain first, but we have
|
||||
// not.
|
||||
Ok(ExecOutput {
|
||||
stage_progress: input.stage_progress.unwrap_or_default(),
|
||||
done: true,
|
||||
reached_tip: false,
|
||||
})
|
||||
} else if stage_reached_max_block {
|
||||
info!("Stage reached maximum block, skipping.");
|
||||
// We reached the maximum block, so we skip the stage
|
||||
Ok(ExecOutput {
|
||||
stage_progress: input.stage_progress.unwrap_or_default(),
|
||||
done: true,
|
||||
reached_tip: true,
|
||||
})
|
||||
} else {
|
||||
stage.execute(tx, input).await
|
||||
return Ok(ControlFlow::Continue)
|
||||
}
|
||||
|
||||
loop {
|
||||
let prev_progress = stage_id.get_progress(tx.get())?;
|
||||
state
|
||||
.events_sender
|
||||
.send(PipelineEvent::Running { stage_id, stage_progress: prev_progress })
|
||||
.await?;
|
||||
|
||||
let stage_reached_max_block = prev_progress
|
||||
.zip(state.max_block)
|
||||
.map_or(false, |(prev_progress, target)| prev_progress >= target);
|
||||
if stage_reached_max_block {
|
||||
info!("Stage reached maximum block, skipping.");
|
||||
|
||||
// We reached the maximum block, so we skip the stage
|
||||
state.set_reached_tip(true);
|
||||
return Ok(ControlFlow::Continue)
|
||||
}
|
||||
|
||||
match self
|
||||
.stage
|
||||
.execute(tx.get_mut(), ExecInput { previous_stage, stage_progress: prev_progress })
|
||||
.await
|
||||
{
|
||||
Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => {
|
||||
debug!(stage = %stage_id, %stage_progress, %done, "Stage made progress");
|
||||
stage_id.save_progress(tx.get_mut(), stage_progress)?;
|
||||
|
||||
state
|
||||
.events_sender
|
||||
.send(PipelineEvent::Ran { stage_id, result: Some(out.clone()) })
|
||||
.await?;
|
||||
|
||||
// TODO: Make the commit interval configurable
|
||||
tx.commit()?;
|
||||
|
||||
state.record_progress_outliers(stage_progress);
|
||||
state.set_reached_tip(reached_tip);
|
||||
|
||||
if done {
|
||||
return Ok(ControlFlow::Continue)
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
state.events_sender.send(PipelineEvent::Ran { stage_id, result: None }).await?;
|
||||
|
||||
return if let StageError::Validation { block } = err {
|
||||
debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error.");
|
||||
|
||||
// We unwind because of a validation error. If the unwind itself fails,
|
||||
// we bail entirely, otherwise we restart the execution loop from the
|
||||
// beginning.
|
||||
Ok(ControlFlow::Unwind {
|
||||
target: prev_progress.unwrap_or_default(),
|
||||
bad_block: Some(block),
|
||||
})
|
||||
} else {
|
||||
Err(err.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
14
crates/stages/src/pipeline/ctrl.rs
Normal file
14
crates/stages/src/pipeline/ctrl.rs
Normal file
@ -0,0 +1,14 @@
|
||||
use reth_primitives::BlockNumber;
|
||||
|
||||
/// Determines the control flow during pipeline execution.
|
||||
pub(crate) enum ControlFlow {
|
||||
/// An unwind was requested and must be performed before continuing.
|
||||
Unwind {
|
||||
/// The block to unwind to.
|
||||
target: BlockNumber,
|
||||
/// The block that caused the unwind.
|
||||
bad_block: Option<BlockNumber>,
|
||||
},
|
||||
/// The pipeline is allowed to continue executing stages.
|
||||
Continue,
|
||||
}
|
||||
43
crates/stages/src/pipeline/state.rs
Normal file
43
crates/stages/src/pipeline/state.rs
Normal file
@ -0,0 +1,43 @@
|
||||
use crate::{
|
||||
pipeline::event::PipelineEvent,
|
||||
util::{opt, opt::MaybeSender},
|
||||
};
|
||||
use reth_primitives::BlockNumber;
|
||||
|
||||
/// The state of the pipeline during execution.
|
||||
pub(crate) struct PipelineState {
|
||||
pub(crate) events_sender: MaybeSender<PipelineEvent>,
|
||||
pub(crate) max_block: Option<BlockNumber>,
|
||||
/// The maximum progress achieved by any stage during the execution of the pipeline.
|
||||
pub(crate) maximum_progress: Option<BlockNumber>,
|
||||
/// The minimum progress achieved by any stage during the execution of the pipeline.
|
||||
pub(crate) minimum_progress: Option<BlockNumber>,
|
||||
/// Whether or not the previous stage reached the tip of the chain.
|
||||
///
|
||||
/// **Do not use this** under normal circumstances. Instead, opt for
|
||||
/// [PipelineState::reached_tip] and [PipelineState::set_reached_tip].
|
||||
pub(crate) reached_tip: bool,
|
||||
}
|
||||
|
||||
impl PipelineState {
|
||||
/// Record the progress of a stage, setting the maximum and minimum progress achieved by any
|
||||
/// stage during the execution of the pipeline.
|
||||
pub(crate) fn record_progress_outliers(&mut self, stage_progress: BlockNumber) {
|
||||
// Update our minimum and maximum stage progress
|
||||
self.minimum_progress = opt::min(self.minimum_progress, stage_progress);
|
||||
self.maximum_progress = opt::max(self.maximum_progress, stage_progress);
|
||||
}
|
||||
|
||||
/// Whether or not the pipeline reached the tip of the chain.
|
||||
pub(crate) fn reached_tip(&self) -> bool {
|
||||
self.reached_tip ||
|
||||
self.max_block
|
||||
.zip(self.minimum_progress)
|
||||
.map_or(false, |(target, progress)| progress >= target)
|
||||
}
|
||||
|
||||
/// Set whether or not the pipeline has reached the tip of the chain.
|
||||
pub(crate) fn set_reached_tip(&mut self, flag: bool) {
|
||||
self.reached_tip = flag;
|
||||
}
|
||||
}
|
||||
@ -16,7 +16,7 @@ pub(crate) mod opt {
|
||||
}
|
||||
|
||||
/// The producing side of a [tokio::mpsc] channel that may or may not be set.
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Clone)]
|
||||
pub(crate) struct MaybeSender<T> {
|
||||
inner: Option<Sender<T>>,
|
||||
}
|
||||
@ -61,3 +61,81 @@ pub(crate) mod opt {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) mod db {
|
||||
use reth_db::mdbx;
|
||||
|
||||
/// A container for a MDBX transaction that will open a new inner transaction when the current
|
||||
/// one is committed.
|
||||
// NOTE: This container is needed since `Transaction::commit` takes `mut self`, so methods in
|
||||
// the pipeline that just take a reference will not be able to commit their transaction and let
|
||||
// the pipeline continue. Is there a better way to do this?
|
||||
pub(crate) struct TxContainer<'db, 'tx, E>
|
||||
where
|
||||
'db: 'tx,
|
||||
E: mdbx::EnvironmentKind,
|
||||
{
|
||||
/// A handle to the MDBX database.
|
||||
pub(crate) db: &'db mdbx::Environment<E>,
|
||||
tx: Option<mdbx::Transaction<'tx, mdbx::RW, E>>,
|
||||
}
|
||||
|
||||
impl<'db, 'tx, E> TxContainer<'db, 'tx, E>
|
||||
where
|
||||
'db: 'tx,
|
||||
E: mdbx::EnvironmentKind,
|
||||
{
|
||||
/// Create a new container with the given database handle.
|
||||
///
|
||||
/// A new inner transaction will be opened.
|
||||
pub(crate) fn new(db: &'db mdbx::Environment<E>) -> Result<Self, mdbx::Error> {
|
||||
Ok(Self { db, tx: Some(db.begin_rw_txn()?) })
|
||||
}
|
||||
|
||||
/// Commit the current inner transaction and open a new one.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if an inner transaction does not exist. This should never be the case unless
|
||||
/// [TxContainer::close] was called without following up with a call to [TxContainer::open].
|
||||
pub(crate) fn commit(&mut self) -> Result<bool, mdbx::Error> {
|
||||
let success =
|
||||
self.tx.take().expect("Tried committing a non-existent transaction").commit()?;
|
||||
self.tx = Some(self.db.begin_rw_txn()?);
|
||||
Ok(success)
|
||||
}
|
||||
|
||||
/// Get the inner transaction.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if an inner transaction does not exist. This should never be the case unless
|
||||
/// [TxContainer::close] was called without following up with a call to [TxContainer::open].
|
||||
pub(crate) fn get(&self) -> &mdbx::Transaction<'tx, mdbx::RW, E> {
|
||||
self.tx.as_ref().expect("Tried getting a reference to a non-existent transaction")
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the inner transaction.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if an inner transaction does not exist. This should never be the case unless
|
||||
/// [TxContainer::close] was called without following up with a call to [TxContainer::open].
|
||||
pub(crate) fn get_mut(&mut self) -> &mut mdbx::Transaction<'tx, mdbx::RW, E> {
|
||||
self.tx
|
||||
.as_mut()
|
||||
.expect("Tried getting a mutable reference to a non-existent transaction")
|
||||
}
|
||||
|
||||
/// Open a new inner transaction.
|
||||
pub(crate) fn open(&mut self) -> Result<(), mdbx::Error> {
|
||||
self.tx = Some(self.db.begin_rw_txn()?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Close the current inner transaction.
|
||||
pub(crate) fn close(&mut self) {
|
||||
self.tx.take();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user