feat(sync): update sync state within pipeline (#697)

* feat(sync): update sync state within pipeline

* address pr comments

* fix comment
This commit is contained in:
Roman Krasiuk
2023-01-04 11:25:40 +02:00
committed by GitHub
parent 6407b5087e
commit c60495df7e
6 changed files with 131 additions and 43 deletions

View File

@ -119,6 +119,7 @@ impl Command {
// TODO: Remove magic numbers
let fetch_client = Arc::new(network.fetch_client().await?);
let mut pipeline = reth_stages::Pipeline::default()
.with_sync_state_updater(network.clone())
.push(HeaderStage {
downloader: headers::linear::LinearDownloadBuilder::default()
.batch_size(config.stages.headers.downloader_batch_size)

View File

@ -1,3 +1,4 @@
use crate::stages::{bodies::BODIES, headers::HEADERS};
use metrics::absolute_counter;
use reth_db::{
tables::SyncStage,
@ -20,6 +21,11 @@ impl Display for StageId {
}
impl StageId {
/// Returns a flag indicating if it's a downloading stage
pub fn is_downloading_stage(&self) -> bool {
*self == HEADERS || *self == BODIES
}
/// Get the last committed progress of this stage.
pub fn get_progress<'db>(&self, tx: &impl DbTx<'db>) -> Result<Option<BlockNumber>, DbError> {
tx.get::<SyncStage>(self.0.as_bytes().to_vec())
@ -45,4 +51,10 @@ mod tests {
assert_eq!(StageId("foo").to_string(), "foo");
assert_eq!(StageId("bar").to_string(), "bar");
}
#[test]
fn is_downloading_stage() {
assert!(HEADERS.is_downloading_stage());
assert!(BODIES.is_downloading_stage());
}
}

View File

@ -2,7 +2,8 @@ use crate::{
db::Transaction, error::*, util::opt::MaybeSender, ExecInput, ExecOutput, Stage, StageError,
StageId, UnwindInput,
};
use reth_db::{database::Database, transaction::DbTx};
use reth_db::database::Database;
use reth_interfaces::sync::{SyncState, SyncStateUpdater};
use reth_primitives::BlockNumber;
use std::{
fmt::{Debug, Formatter},
@ -70,29 +71,31 @@ use state::*;
/// In case of a validation error (as determined by the consensus engine) in one of the stages, the
/// pipeline will unwind the stages in reverse order of execution. It is also possible to
/// request an unwind manually (see [Pipeline::unwind]).
pub struct Pipeline<DB: Database> {
pub struct Pipeline<DB: Database, U: SyncStateUpdater> {
stages: Vec<QueuedStage<DB>>,
max_block: Option<BlockNumber>,
events_sender: MaybeSender<PipelineEvent>,
sync_state_updater: Option<U>,
}
impl<DB: Database> Default for Pipeline<DB> {
impl<DB: Database, U: SyncStateUpdater> Default for Pipeline<DB, U> {
fn default() -> Self {
Self { stages: Vec::new(), max_block: None, events_sender: MaybeSender::new(None) }
Self {
stages: Vec::new(),
max_block: None,
events_sender: MaybeSender::new(None),
sync_state_updater: None,
}
}
impl<DB: Database> Debug for Pipeline<DB> {
}
impl<DB: Database, U: SyncStateUpdater> Debug for Pipeline<DB, U> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pipeline").field("max_block", &self.max_block).finish()
}
}
impl<DB: Database> Pipeline<DB> {
/// Create a new pipeline with a channel for receiving events (see [PipelineEvent]).
pub fn with_channel(sender: Sender<PipelineEvent>) -> Self {
Self::default().set_channel(sender)
}
impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
/// Add a stage to the pipeline.
pub fn push<S>(mut self, stage: S) -> Self
where
@ -105,17 +108,23 @@ impl<DB: Database> Pipeline<DB> {
/// Set the target block.
///
/// Once this block is reached, syncing will stop.
pub fn set_max_block(mut self, block: Option<BlockNumber>) -> Self {
pub fn with_max_block(mut self, block: Option<BlockNumber>) -> Self {
self.max_block = block;
self
}
/// Set a channel the pipeline will transmit events over (see [PipelineEvent]).
pub fn set_channel(mut self, sender: Sender<PipelineEvent>) -> Self {
pub fn with_channel(mut self, sender: Sender<PipelineEvent>) -> Self {
self.events_sender.set(Some(sender));
self
}
/// Set a [SyncStateUpdater].
pub fn with_sync_state_updater(mut self, updater: U) -> Self {
self.sync_state_updater = Some(updater);
self
}
/// Run the pipeline in an infinite loop. Will terminate early if the user has specified
/// a `max_block` in the pipeline.
pub async fn run(&mut self, db: Arc<DB>) -> Result<(), PipelineError> {
@ -130,7 +139,7 @@ impl<DB: Database> Pipeline<DB> {
// Terminate the loop early if it's reached the maximum user
// configured block.
if matches!(next_action, ControlFlow::Continue) &&
if next_action.should_continue() &&
state
.minimum_progress
.zip(self.max_block)
@ -152,9 +161,17 @@ impl<DB: Database> Pipeline<DB> {
state: &mut PipelineState,
db: &DB,
) -> Result<ControlFlow, PipelineError> {
let mut pipeline_progress = PipelineProgress::default();
let mut previous_stage = None;
for (_, queued_stage) in self.stages.iter_mut().enumerate() {
for queued_stage in self.stages.iter_mut() {
let stage_id = queued_stage.stage.id();
// Update sync state
if let Some(ref updater) = self.sync_state_updater {
let state = pipeline_progress.current_sync_state(stage_id.is_downloading_stage());
updater.update_sync_state(state);
}
trace!(
target: "sync::pipeline",
stage = %stage_id,
@ -166,21 +183,23 @@ impl<DB: Database> Pipeline<DB> {
.await?;
match next {
ControlFlow::Continue => {
let tx = db.tx()?;
previous_stage =
Some((stage_id, stage_id.get_progress(&tx)?.unwrap_or_default()));
tx.commit()?;
}
ControlFlow::NoProgress => {} // noop
ControlFlow::Continue { progress } => pipeline_progress.update(progress),
ControlFlow::Unwind { target, bad_block } => {
// reset the sync state
if let Some(ref updater) = self.sync_state_updater {
updater.update_sync_state(SyncState::Downloading { target_block: target });
}
self.unwind(db, target, bad_block).await?;
return Ok(ControlFlow::Unwind { target, bad_block })
}
}
previous_stage =
Some((stage_id, db.view(|tx| stage_id.get_progress(tx))??.unwrap_or_default()));
}
Ok(ControlFlow::Continue)
Ok(pipeline_progress.next_ctrl())
}
/// Unwind the stages to the target block.
@ -237,6 +256,34 @@ impl<DB: Database> Pipeline<DB> {
}
}
#[derive(Debug, Default)]
struct PipelineProgress {
progress: Option<u64>,
}
impl PipelineProgress {
fn update(&mut self, progress: u64) {
self.progress = Some(progress);
}
/// Create a sync state from pipeline progress.
fn current_sync_state(&self, downloading: bool) -> SyncState {
match self.progress {
Some(progress) if downloading => SyncState::Downloading { target_block: progress },
Some(progress) => SyncState::Executing { target_block: progress },
None => SyncState::Idle,
}
}
/// Get next control flow step
fn next_ctrl(&self) -> ControlFlow {
match self.progress {
Some(progress) => ControlFlow::Continue { progress },
None => ControlFlow::NoProgress,
}
}
}
/// A container for a queued stage.
struct QueuedStage<DB: Database> {
/// The actual stage to execute.
@ -252,6 +299,7 @@ impl<DB: Database> QueuedStage<DB> {
db: &DB,
) -> Result<ControlFlow, PipelineError> {
let stage_id = self.stage.id();
let mut made_progress = false;
loop {
let mut tx = Transaction::new(db)?;
@ -269,7 +317,7 @@ impl<DB: Database> QueuedStage<DB> {
state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
// We reached the maximum block, so we skip the stage
return Ok(ControlFlow::Continue)
return Ok(ControlFlow::NoProgress)
}
state
@ -283,6 +331,7 @@ impl<DB: Database> QueuedStage<DB> {
.await
{
Ok(out @ ExecOutput { stage_progress, done }) => {
made_progress |= stage_progress != prev_progress.unwrap_or_default();
info!(
target: "sync::pipeline",
stage = %stage_id,
@ -303,7 +352,11 @@ impl<DB: Database> QueuedStage<DB> {
state.record_progress_outliers(stage_progress);
if done {
return Ok(ControlFlow::Continue)
return Ok(if made_progress {
ControlFlow::Continue { progress: stage_progress }
} else {
ControlFlow::NoProgress
})
}
}
Err(err) => {
@ -353,7 +406,7 @@ mod tests {
use crate::{StageId, UnwindOutput};
use assert_matches::assert_matches;
use reth_db::mdbx::{self, test_utils, Env, EnvKind, WriteMap};
use reth_interfaces::consensus;
use reth_interfaces::{consensus, sync::NoopSyncStateUpdate};
use tokio::sync::mpsc::channel;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use utils::TestStage;
@ -366,7 +419,8 @@ mod tests {
// Run pipeline
tokio::spawn(async move {
Pipeline::<Env<WriteMap>>::with_channel(tx)
Pipeline::<Env<WriteMap>, NoopSyncStateUpdate>::default()
.with_channel(tx)
.push(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 20, done: true })),
@ -375,7 +429,7 @@ mod tests {
TestStage::new(StageId("B"))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
)
.set_max_block(Some(10))
.with_max_block(Some(10))
.run(db)
.await
});
@ -406,7 +460,7 @@ mod tests {
// Run pipeline
tokio::spawn(async move {
let mut pipeline = Pipeline::<Env<mdbx::WriteMap>>::default()
let mut pipeline = Pipeline::<Env<mdbx::WriteMap>, NoopSyncStateUpdate>::default()
.push(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 100, done: true }))
@ -422,13 +476,17 @@ mod tests {
.add_exec(Ok(ExecOutput { stage_progress: 20, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
)
.set_max_block(Some(10));
.with_max_block(Some(10));
// Sync first
pipeline.run(db.clone()).await.expect("Could not run pipeline");
// Unwind
pipeline.set_channel(tx).unwind(&db, 1, None).await.expect("Could not unwind pipeline");
pipeline
.with_channel(tx)
.unwind(&db, 1, None)
.await
.expect("Could not unwind pipeline");
});
// Check that the stages were unwound in reverse order
@ -482,7 +540,7 @@ mod tests {
// Run pipeline
tokio::spawn(async move {
Pipeline::<Env<mdbx::WriteMap>>::default()
Pipeline::<Env<mdbx::WriteMap>, NoopSyncStateUpdate>::default()
.push(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
@ -498,8 +556,8 @@ mod tests {
.add_unwind(Ok(UnwindOutput { stage_progress: 0 }))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
)
.set_max_block(Some(10))
.set_channel(tx)
.with_max_block(Some(10))
.with_channel(tx)
.run(db)
.await
.expect("Could not run pipeline");
@ -543,20 +601,20 @@ mod tests {
async fn pipeline_error_handling() {
// Non-fatal
let db = test_utils::create_test_db(EnvKind::RW);
let result = Pipeline::<Env<WriteMap>>::default()
let result = Pipeline::<Env<WriteMap>, NoopSyncStateUpdate>::default()
.push(
TestStage::new(StageId("NonFatal"))
.add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
)
.set_max_block(Some(10))
.with_max_block(Some(10))
.run(db)
.await;
assert_matches!(result, Ok(()));
// Fatal
let db = test_utils::create_test_db(EnvKind::RW);
let result = Pipeline::<Env<WriteMap>>::default()
let result = Pipeline::<Env<WriteMap>, NoopSyncStateUpdate>::default()
.push(TestStage::new(StageId("Fatal")).add_exec(Err(StageError::DatabaseIntegrity(
DatabaseIntegrityError::BlockBody { number: 5 },
))))

View File

@ -11,5 +11,15 @@ pub(crate) enum ControlFlow {
bad_block: Option<BlockNumber>,
},
/// The pipeline is allowed to continue executing stages.
Continue,
Continue {
/// The progress of the last stage
progress: u64,
},
NoProgress,
}
impl ControlFlow {
pub(crate) fn should_continue(&self) -> bool {
matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress)
}
}

View File

@ -18,7 +18,7 @@ use reth_primitives::{BlockNumber, SealedHeader};
use std::{fmt::Debug, sync::Arc};
use tracing::*;
const BODIES: StageId = StageId("Bodies");
pub(crate) const BODIES: StageId = StageId("Bodies");
// TODO(onbjerg): Metrics and events (gradual status for e.g. CLI)
/// The body stage downloads block bodies.

View File

@ -24,7 +24,7 @@ use reth_primitives::{BlockNumber, Header, SealedHeader, H256, U256};
use std::{fmt::Debug, sync::Arc};
use tracing::*;
const HEADERS: StageId = StageId("Headers");
pub(crate) const HEADERS: StageId = StageId("Headers");
/// The headers stage.
///
@ -76,6 +76,13 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
// Lookup the head and tip of the sync range
let (head, tip) = self.get_head_and_tip(tx, current_progress).await?;
// Nothing to sync
if head.hash() == tip {
info!(target: "sync::stages::headers", stage_progress = current_progress, target = ?tip, "Target block already reached");
return Ok(ExecOutput { stage_progress: current_progress, done: true })
}
debug!(target: "sync::stages::headers", ?tip, head = ?head.hash(), "Commencing sync");
// The downloader returns the headers in descending order starting from the tip
@ -214,19 +221,19 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
// reverse from there. Else, it should use whatever the forkchoice state reports.
let tip = match next_header {
Some(header) if stage_progress + 1 != header.number => header.parent_hash,
None => self.next_fork_choice_state(&head.hash()).await.head_block_hash,
None => self.next_fork_choice_state().await.head_block_hash,
_ => return Err(StageError::StageProgress(stage_progress)),
};
Ok((head, tip))
}
async fn next_fork_choice_state(&self, head: &H256) -> ForkchoiceState {
async fn next_fork_choice_state(&self) -> ForkchoiceState {
let mut state_rcv = self.consensus.fork_choice_state();
loop {
let _ = state_rcv.changed().await;
let forkchoice = state_rcv.borrow();
if !forkchoice.head_block_hash.is_zero() && forkchoice.head_block_hash != *head {
if !forkchoice.head_block_hash.is_zero() {
return forkchoice.clone()
}
}