chore(pipeline): extract helper types to separate files (#1842)

This commit is contained in:
Roman Krasiuk
2023-03-19 16:37:11 +02:00
committed by GitHub
parent 02cb1e3257
commit 274ab54703
3 changed files with 70 additions and 61 deletions

View File

@ -1,12 +1,9 @@
use crate::{error::*, util::opt, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput};
use metrics::Gauge;
use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput};
use reth_db::database::Database;
use reth_interfaces::sync::{SyncState, SyncStateUpdater};
use reth_metrics_derive::Metrics;
use reth_primitives::{BlockNumber, H256};
use reth_provider::Transaction;
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
ops::Deref,
sync::Arc,
@ -18,12 +15,16 @@ use tracing::*;
mod builder;
mod ctrl;
mod event;
mod progress;
mod set;
mod sync_metrics;
pub use builder::*;
use ctrl::*;
pub use event::*;
use progress::*;
pub use set::*;
use sync_metrics::*;
#[cfg_attr(doc, aquamarine::aquamarine)]
/// A staged sync pipeline.
@ -368,63 +369,6 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
}
}
#[derive(Metrics)]
#[metrics(scope = "sync")]
struct StageMetrics {
/// The block number of the last commit for a stage.
checkpoint: Gauge,
}
#[derive(Default)]
struct Metrics {
checkpoints: HashMap<StageId, StageMetrics>,
}
impl Metrics {
fn stage_checkpoint(&mut self, stage_id: StageId, progress: u64) {
self.checkpoints
.entry(stage_id)
.or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]))
.checkpoint
.set(progress as f64);
}
}
#[derive(Debug, Default)]
struct PipelineProgress {
/// The progress of the current stage
pub(crate) progress: Option<BlockNumber>,
/// The maximum progress achieved by any stage during the execution of the pipeline.
pub(crate) maximum_progress: Option<BlockNumber>,
/// The minimum progress achieved by any stage during the execution of the pipeline.
pub(crate) minimum_progress: Option<BlockNumber>,
}
impl PipelineProgress {
fn update(&mut self, progress: BlockNumber) {
self.progress = Some(progress);
self.minimum_progress = opt::min(self.minimum_progress, progress);
self.maximum_progress = opt::max(self.maximum_progress, progress);
}
/// Create a sync state from pipeline progress.
fn current_sync_state(&self, downloading: bool) -> SyncState {
match self.progress {
Some(progress) if downloading => SyncState::Downloading { target_block: progress },
Some(progress) => SyncState::Executing { target_block: progress },
None => SyncState::Idle,
}
}
/// Get next control flow step
fn next_ctrl(&self) -> ControlFlow {
match self.progress {
Some(progress) => ControlFlow::Continue { progress },
None => ControlFlow::NoProgress { stage_progress: None },
}
}
}
/// A container for a queued stage.
pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;

View File

@ -0,0 +1,39 @@
use super::ctrl::ControlFlow;
use crate::util::opt;
use reth_interfaces::sync::SyncState;
use reth_primitives::BlockNumber;
#[derive(Debug, Default)]
pub(crate) struct PipelineProgress {
/// The progress of the current stage
pub(crate) progress: Option<BlockNumber>,
/// The maximum progress achieved by any stage during the execution of the pipeline.
pub(crate) maximum_progress: Option<BlockNumber>,
/// The minimum progress achieved by any stage during the execution of the pipeline.
pub(crate) minimum_progress: Option<BlockNumber>,
}
impl PipelineProgress {
pub(crate) fn update(&mut self, progress: BlockNumber) {
self.progress = Some(progress);
self.minimum_progress = opt::min(self.minimum_progress, progress);
self.maximum_progress = opt::max(self.maximum_progress, progress);
}
/// Create a sync state from pipeline progress.
pub(crate) fn current_sync_state(&self, downloading: bool) -> SyncState {
match self.progress {
Some(progress) if downloading => SyncState::Downloading { target_block: progress },
Some(progress) => SyncState::Executing { target_block: progress },
None => SyncState::Idle,
}
}
/// Get next control flow step
pub(crate) fn next_ctrl(&self) -> ControlFlow {
match self.progress {
Some(progress) => ControlFlow::Continue { progress },
None => ControlFlow::NoProgress { stage_progress: None },
}
}
}

View File

@ -0,0 +1,26 @@
use crate::StageId;
use metrics::Gauge;
use reth_metrics_derive::Metrics;
use std::collections::HashMap;
#[derive(Metrics)]
#[metrics(scope = "sync")]
pub(crate) struct StageMetrics {
/// The block number of the last commit for a stage.
checkpoint: Gauge,
}
#[derive(Default)]
pub(crate) struct Metrics {
checkpoints: HashMap<StageId, StageMetrics>,
}
impl Metrics {
pub(crate) fn stage_checkpoint(&mut self, stage_id: StageId, progress: u64) {
self.checkpoints
.entry(stage_id)
.or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]))
.checkpoint
.set(progress as f64);
}
}