From c60495df7e5e31e5a65cfd4520a276e679683be6 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Wed, 4 Jan 2023 11:25:40 +0200 Subject: [PATCH] feat(sync): update sync state within pipeline (#697) * feat(sync): update sync state within pipeline * address pr comments * fix comment --- bin/reth/src/node/mod.rs | 1 + crates/stages/src/id.rs | 12 +++ crates/stages/src/pipeline.rs | 132 ++++++++++++++++++++-------- crates/stages/src/pipeline/ctrl.rs | 12 ++- crates/stages/src/stages/bodies.rs | 2 +- crates/stages/src/stages/headers.rs | 15 +++- 6 files changed, 131 insertions(+), 43 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 81d392078..91ab78422 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -119,6 +119,7 @@ impl Command { // TODO: Remove magic numbers let fetch_client = Arc::new(network.fetch_client().await?); let mut pipeline = reth_stages::Pipeline::default() + .with_sync_state_updater(network.clone()) .push(HeaderStage { downloader: headers::linear::LinearDownloadBuilder::default() .batch_size(config.stages.headers.downloader_batch_size) diff --git a/crates/stages/src/id.rs b/crates/stages/src/id.rs index 597643f90..9853053e0 100644 --- a/crates/stages/src/id.rs +++ b/crates/stages/src/id.rs @@ -1,3 +1,4 @@ +use crate::stages::{bodies::BODIES, headers::HEADERS}; use metrics::absolute_counter; use reth_db::{ tables::SyncStage, @@ -20,6 +21,11 @@ impl Display for StageId { } impl StageId { + /// Returns a flag indicating if it's a downloading stage + pub fn is_downloading_stage(&self) -> bool { + *self == HEADERS || *self == BODIES + } + /// Get the last committed progress of this stage. pub fn get_progress<'db>(&self, tx: &impl DbTx<'db>) -> Result, DbError> { tx.get::(self.0.as_bytes().to_vec()) @@ -45,4 +51,10 @@ mod tests { assert_eq!(StageId("foo").to_string(), "foo"); assert_eq!(StageId("bar").to_string(), "bar"); } + + #[test] + fn is_downloading_stage() { + assert!(HEADERS.is_downloading_stage()); + assert!(BODIES.is_downloading_stage()); + } } diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index 6b2b15eee..f0602e44f 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -2,7 +2,8 @@ use crate::{ db::Transaction, error::*, util::opt::MaybeSender, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, }; -use reth_db::{database::Database, transaction::DbTx}; +use reth_db::database::Database; +use reth_interfaces::sync::{SyncState, SyncStateUpdater}; use reth_primitives::BlockNumber; use std::{ fmt::{Debug, Formatter}, @@ -70,29 +71,31 @@ use state::*; /// In case of a validation error (as determined by the consensus engine) in one of the stages, the /// pipeline will unwind the stages in reverse order of execution. It is also possible to /// request an unwind manually (see [Pipeline::unwind]). -pub struct Pipeline { +pub struct Pipeline { stages: Vec>, max_block: Option, events_sender: MaybeSender, + sync_state_updater: Option, } -impl Default for Pipeline { +impl Default for Pipeline { fn default() -> Self { - Self { stages: Vec::new(), max_block: None, events_sender: MaybeSender::new(None) } + Self { + stages: Vec::new(), + max_block: None, + events_sender: MaybeSender::new(None), + sync_state_updater: None, + } } } -impl Debug for Pipeline { + +impl Debug for Pipeline { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("Pipeline").field("max_block", &self.max_block).finish() } } -impl Pipeline { - /// Create a new pipeline with a channel for receiving events (see [PipelineEvent]). - pub fn with_channel(sender: Sender) -> Self { - Self::default().set_channel(sender) - } - +impl Pipeline { /// Add a stage to the pipeline. pub fn push(mut self, stage: S) -> Self where @@ -105,17 +108,23 @@ impl Pipeline { /// Set the target block. /// /// Once this block is reached, syncing will stop. - pub fn set_max_block(mut self, block: Option) -> Self { + pub fn with_max_block(mut self, block: Option) -> Self { self.max_block = block; self } /// Set a channel the pipeline will transmit events over (see [PipelineEvent]). - pub fn set_channel(mut self, sender: Sender) -> Self { + pub fn with_channel(mut self, sender: Sender) -> Self { self.events_sender.set(Some(sender)); self } + /// Set a [SyncStateUpdater]. + pub fn with_sync_state_updater(mut self, updater: U) -> Self { + self.sync_state_updater = Some(updater); + self + } + /// Run the pipeline in an infinite loop. Will terminate early if the user has specified /// a `max_block` in the pipeline. pub async fn run(&mut self, db: Arc) -> Result<(), PipelineError> { @@ -130,7 +139,7 @@ impl Pipeline { // Terminate the loop early if it's reached the maximum user // configured block. - if matches!(next_action, ControlFlow::Continue) && + if next_action.should_continue() && state .minimum_progress .zip(self.max_block) @@ -152,9 +161,17 @@ impl Pipeline { state: &mut PipelineState, db: &DB, ) -> Result { + let mut pipeline_progress = PipelineProgress::default(); let mut previous_stage = None; - for (_, queued_stage) in self.stages.iter_mut().enumerate() { + for queued_stage in self.stages.iter_mut() { let stage_id = queued_stage.stage.id(); + + // Update sync state + if let Some(ref updater) = self.sync_state_updater { + let state = pipeline_progress.current_sync_state(stage_id.is_downloading_stage()); + updater.update_sync_state(state); + } + trace!( target: "sync::pipeline", stage = %stage_id, @@ -166,21 +183,23 @@ impl Pipeline { .await?; match next { - ControlFlow::Continue => { - let tx = db.tx()?; - previous_stage = - Some((stage_id, stage_id.get_progress(&tx)?.unwrap_or_default())); - tx.commit()?; - } + ControlFlow::NoProgress => {} // noop + ControlFlow::Continue { progress } => pipeline_progress.update(progress), ControlFlow::Unwind { target, bad_block } => { + // reset the sync state + if let Some(ref updater) = self.sync_state_updater { + updater.update_sync_state(SyncState::Downloading { target_block: target }); + } self.unwind(db, target, bad_block).await?; - return Ok(ControlFlow::Unwind { target, bad_block }) } } + + previous_stage = + Some((stage_id, db.view(|tx| stage_id.get_progress(tx))??.unwrap_or_default())); } - Ok(ControlFlow::Continue) + Ok(pipeline_progress.next_ctrl()) } /// Unwind the stages to the target block. @@ -237,6 +256,34 @@ impl Pipeline { } } +#[derive(Debug, Default)] +struct PipelineProgress { + progress: Option, +} + +impl PipelineProgress { + fn update(&mut self, progress: u64) { + self.progress = Some(progress); + } + + /// Create a sync state from pipeline progress. + fn current_sync_state(&self, downloading: bool) -> SyncState { + match self.progress { + Some(progress) if downloading => SyncState::Downloading { target_block: progress }, + Some(progress) => SyncState::Executing { target_block: progress }, + None => SyncState::Idle, + } + } + + /// Get next control flow step + fn next_ctrl(&self) -> ControlFlow { + match self.progress { + Some(progress) => ControlFlow::Continue { progress }, + None => ControlFlow::NoProgress, + } + } +} + /// A container for a queued stage. struct QueuedStage { /// The actual stage to execute. @@ -252,6 +299,7 @@ impl QueuedStage { db: &DB, ) -> Result { let stage_id = self.stage.id(); + let mut made_progress = false; loop { let mut tx = Transaction::new(db)?; @@ -269,7 +317,7 @@ impl QueuedStage { state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?; // We reached the maximum block, so we skip the stage - return Ok(ControlFlow::Continue) + return Ok(ControlFlow::NoProgress) } state @@ -283,6 +331,7 @@ impl QueuedStage { .await { Ok(out @ ExecOutput { stage_progress, done }) => { + made_progress |= stage_progress != prev_progress.unwrap_or_default(); info!( target: "sync::pipeline", stage = %stage_id, @@ -303,7 +352,11 @@ impl QueuedStage { state.record_progress_outliers(stage_progress); if done { - return Ok(ControlFlow::Continue) + return Ok(if made_progress { + ControlFlow::Continue { progress: stage_progress } + } else { + ControlFlow::NoProgress + }) } } Err(err) => { @@ -353,7 +406,7 @@ mod tests { use crate::{StageId, UnwindOutput}; use assert_matches::assert_matches; use reth_db::mdbx::{self, test_utils, Env, EnvKind, WriteMap}; - use reth_interfaces::consensus; + use reth_interfaces::{consensus, sync::NoopSyncStateUpdate}; use tokio::sync::mpsc::channel; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use utils::TestStage; @@ -366,7 +419,8 @@ mod tests { // Run pipeline tokio::spawn(async move { - Pipeline::>::with_channel(tx) + Pipeline::, NoopSyncStateUpdate>::default() + .with_channel(tx) .push( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { stage_progress: 20, done: true })), @@ -375,7 +429,7 @@ mod tests { TestStage::new(StageId("B")) .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) - .set_max_block(Some(10)) + .with_max_block(Some(10)) .run(db) .await }); @@ -406,7 +460,7 @@ mod tests { // Run pipeline tokio::spawn(async move { - let mut pipeline = Pipeline::>::default() + let mut pipeline = Pipeline::, NoopSyncStateUpdate>::default() .push( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { stage_progress: 100, done: true })) @@ -422,13 +476,17 @@ mod tests { .add_exec(Ok(ExecOutput { stage_progress: 20, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), ) - .set_max_block(Some(10)); + .with_max_block(Some(10)); // Sync first pipeline.run(db.clone()).await.expect("Could not run pipeline"); // Unwind - pipeline.set_channel(tx).unwind(&db, 1, None).await.expect("Could not unwind pipeline"); + pipeline + .with_channel(tx) + .unwind(&db, 1, None) + .await + .expect("Could not unwind pipeline"); }); // Check that the stages were unwound in reverse order @@ -482,7 +540,7 @@ mod tests { // Run pipeline tokio::spawn(async move { - Pipeline::>::default() + Pipeline::, NoopSyncStateUpdate>::default() .push( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })) @@ -498,8 +556,8 @@ mod tests { .add_unwind(Ok(UnwindOutput { stage_progress: 0 })) .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) - .set_max_block(Some(10)) - .set_channel(tx) + .with_max_block(Some(10)) + .with_channel(tx) .run(db) .await .expect("Could not run pipeline"); @@ -543,20 +601,20 @@ mod tests { async fn pipeline_error_handling() { // Non-fatal let db = test_utils::create_test_db(EnvKind::RW); - let result = Pipeline::>::default() + let result = Pipeline::, NoopSyncStateUpdate>::default() .push( TestStage::new(StageId("NonFatal")) .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) - .set_max_block(Some(10)) + .with_max_block(Some(10)) .run(db) .await; assert_matches!(result, Ok(())); // Fatal let db = test_utils::create_test_db(EnvKind::RW); - let result = Pipeline::>::default() + let result = Pipeline::, NoopSyncStateUpdate>::default() .push(TestStage::new(StageId("Fatal")).add_exec(Err(StageError::DatabaseIntegrity( DatabaseIntegrityError::BlockBody { number: 5 }, )))) diff --git a/crates/stages/src/pipeline/ctrl.rs b/crates/stages/src/pipeline/ctrl.rs index 461ca6146..40edaace3 100644 --- a/crates/stages/src/pipeline/ctrl.rs +++ b/crates/stages/src/pipeline/ctrl.rs @@ -11,5 +11,15 @@ pub(crate) enum ControlFlow { bad_block: Option, }, /// The pipeline is allowed to continue executing stages. - Continue, + Continue { + /// The progress of the last stage + progress: u64, + }, + NoProgress, +} + +impl ControlFlow { + pub(crate) fn should_continue(&self) -> bool { + matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress) + } } diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 4003dd08a..c9fb39977 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -18,7 +18,7 @@ use reth_primitives::{BlockNumber, SealedHeader}; use std::{fmt::Debug, sync::Arc}; use tracing::*; -const BODIES: StageId = StageId("Bodies"); +pub(crate) const BODIES: StageId = StageId("Bodies"); // TODO(onbjerg): Metrics and events (gradual status for e.g. CLI) /// The body stage downloads block bodies. diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 6046a206f..dd03cd6d2 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -24,7 +24,7 @@ use reth_primitives::{BlockNumber, Header, SealedHeader, H256, U256}; use std::{fmt::Debug, sync::Arc}; use tracing::*; -const HEADERS: StageId = StageId("Headers"); +pub(crate) const HEADERS: StageId = StageId("Headers"); /// The headers stage. /// @@ -76,6 +76,13 @@ impl // reverse from there. Else, it should use whatever the forkchoice state reports. let tip = match next_header { Some(header) if stage_progress + 1 != header.number => header.parent_hash, - None => self.next_fork_choice_state(&head.hash()).await.head_block_hash, + None => self.next_fork_choice_state().await.head_block_hash, _ => return Err(StageError::StageProgress(stage_progress)), }; Ok((head, tip)) } - async fn next_fork_choice_state(&self, head: &H256) -> ForkchoiceState { + async fn next_fork_choice_state(&self) -> ForkchoiceState { let mut state_rcv = self.consensus.fork_choice_state(); loop { let _ = state_rcv.changed().await; let forkchoice = state_rcv.borrow(); - if !forkchoice.head_block_hash.is_zero() && forkchoice.head_block_hash != *head { + if !forkchoice.head_block_hash.is_zero() { return forkchoice.clone() } }