mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 19:09:54 +00:00
feat: pipeline builder (#1017)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
@ -773,7 +773,7 @@ struct HeadersResponseError {
|
||||
}
|
||||
|
||||
/// The block to which we want to close the gap: (local head...sync target]
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Default)]
|
||||
struct SyncTargetBlock {
|
||||
/// Block hash of the targeted block
|
||||
hash: H256,
|
||||
@ -943,7 +943,6 @@ mod tests {
|
||||
|
||||
let mut downloader = LinearDownloadBuilder::default()
|
||||
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
|
||||
|
||||
downloader.update_local_head(genesis);
|
||||
downloader.update_sync_target(SyncTarget::Tip(H256::random()));
|
||||
|
||||
@ -1015,6 +1014,7 @@ mod tests {
|
||||
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
|
||||
downloader.update_local_head(genesis);
|
||||
downloader.update_sync_target(SyncTarget::Tip(H256::random()));
|
||||
|
||||
downloader.next_request_block_number = start;
|
||||
|
||||
let mut total = 0;
|
||||
|
||||
@ -12,7 +12,6 @@ use reth_primitives::{ChainSpec, ForkFilter, NodeRecord, PeerId, MAINNET};
|
||||
use reth_provider::{BlockProvider, HeaderProvider};
|
||||
use reth_tasks::TaskExecutor;
|
||||
use secp256k1::{SecretKey, SECP256K1};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
|
||||
@ -120,7 +119,8 @@ where
|
||||
}
|
||||
|
||||
/// Builder for [`NetworkConfig`](struct.NetworkConfig.html).
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[allow(missing_docs)]
|
||||
pub struct NetworkConfigBuilder {
|
||||
/// The node's secret key, from which the node's identity is derived.
|
||||
@ -338,7 +338,8 @@ impl NetworkConfigBuilder {
|
||||
/// This affects block propagation in the `eth` sub-protocol [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
|
||||
///
|
||||
/// In POS `NewBlockHashes` and `NewBlock` messages become invalid.
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
pub enum NetworkMode {
|
||||
/// Network is in proof-of-work mode.
|
||||
Work,
|
||||
|
||||
@ -24,16 +24,20 @@ reth-provider = { path = "../storage/provider" }
|
||||
|
||||
# async
|
||||
tokio = { version = "1.21.2", features = ["sync"] }
|
||||
|
||||
tokio-stream = "0.1.10"
|
||||
async-trait = "0.1.57"
|
||||
thiserror = "1.0.37"
|
||||
tracing = "0.1.36"
|
||||
aquamarine = "0.1.12"
|
||||
metrics = "0.20.1"
|
||||
futures-util = "0.3.25"
|
||||
|
||||
# observability
|
||||
tracing = "0.1.36"
|
||||
metrics = "0.20.1"
|
||||
|
||||
# misc
|
||||
serde = { version = "1.0", optional = true }
|
||||
thiserror = "1.0.37"
|
||||
aquamarine = "0.1.12"
|
||||
itertools = "0.10.5"
|
||||
rayon = "1.6.0"
|
||||
|
||||
[dev-dependencies]
|
||||
# reth
|
||||
reth-db = { path = "../storage/db", features = ["test-utils", "mdbx"] }
|
||||
@ -41,8 +45,11 @@ reth-interfaces = { path = "../interfaces", features = ["test-utils"] }
|
||||
reth-downloaders = { path = "../net/downloaders" }
|
||||
reth-eth-wire = { path = "../net/eth-wire" } # TODO(onbjerg): We only need this for [BlockBody]
|
||||
tokio = { version = "*", features = ["rt", "sync", "macros"] }
|
||||
tokio-stream = "0.1.10"
|
||||
tempfile = "3.3.0"
|
||||
assert_matches = "1.5.0"
|
||||
rand = "0.8.5"
|
||||
paste = "1.0"
|
||||
|
||||
[features]
|
||||
default = ["serde"]
|
||||
serde = ["dep:serde"]
|
||||
@ -1,4 +1,4 @@
|
||||
use crate::stages::{bodies::BODIES, headers::HEADERS};
|
||||
use crate::stages::{BODIES, HEADERS};
|
||||
use metrics::absolute_counter;
|
||||
use reth_db::{
|
||||
tables::SyncStage,
|
||||
@ -11,7 +11,7 @@ use std::fmt::Display;
|
||||
/// The ID of a stage.
|
||||
///
|
||||
/// Each stage ID must be unique.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||
pub struct StageId(pub &'static str);
|
||||
|
||||
impl Display for StageId {
|
||||
|
||||
@ -6,14 +6,48 @@
|
||||
))]
|
||||
//! Staged syncing primitives for reth.
|
||||
//!
|
||||
//! See [Stage] and [Pipeline].
|
||||
//! This crate contains the syncing primitives [`Pipeline`] and [`Stage`], as well as all stages
|
||||
//! that reth uses to sync.
|
||||
//!
|
||||
//! # Metrics
|
||||
//! A pipeline can be configured using [`Pipeline::builder()`].
|
||||
//!
|
||||
//! This library exposes metrics via the [`metrics`][metrics_core] crate:
|
||||
//! For ease of use, this crate also exposes a set of [`StageSet`]s, which are collections of stages
|
||||
//! that perform specific functions during sync. Stage sets can be customized; it is possible to
|
||||
//! add, disable and replace stages in the set.
|
||||
//!
|
||||
//! - `stage_progress{stage}`: The block number each stage has currently reached.
|
||||
|
||||
//! # Examples
|
||||
//!
|
||||
//! ```
|
||||
//! # use std::sync::Arc;
|
||||
//! # use reth_db::mdbx::test_utils::create_test_rw_db;
|
||||
//! # use reth_db::mdbx::{Env, WriteMap};
|
||||
//! # use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder;
|
||||
//! # use reth_downloaders::headers::linear::LinearDownloadBuilder;
|
||||
//! # use reth_interfaces::consensus::Consensus;
|
||||
//! # use reth_interfaces::sync::NoopSyncStateUpdate;
|
||||
//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient};
|
||||
//! # use reth_primitives::PeerId;
|
||||
//! # use reth_stages::Pipeline;
|
||||
//! # use reth_stages::sets::DefaultStages;
|
||||
//! # let consensus: Arc<dyn Consensus> = Arc::new(TestConsensus::default());
|
||||
//! # let headers_downloader = LinearDownloadBuilder::default().build(
|
||||
//! # consensus.clone(),
|
||||
//! # Arc::new(TestHeadersClient::default())
|
||||
//! # );
|
||||
//! # let bodies_downloader = ConcurrentDownloaderBuilder::default().build(
|
||||
//! # Arc::new(TestBodiesClient { responder: |_| Ok((PeerId::zero(), vec![]).into()) }),
|
||||
//! # consensus.clone(),
|
||||
//! # create_test_rw_db()
|
||||
//! # );
|
||||
//! // Create a pipeline that can fully sync
|
||||
//! # let pipeline: Pipeline<Env<WriteMap>, NoopSyncStateUpdate> =
|
||||
//! Pipeline::builder()
|
||||
//! .add_stages(
|
||||
//! DefaultStages::new(consensus, headers_downloader, bodies_downloader)
|
||||
//! )
|
||||
//! .build();
|
||||
//! #
|
||||
//! ```
|
||||
mod db;
|
||||
mod error;
|
||||
mod id;
|
||||
@ -24,9 +58,14 @@ mod util;
|
||||
#[cfg(test)]
|
||||
mod test_utils;
|
||||
|
||||
/// A re-export of common structs and traits.
|
||||
pub mod prelude;
|
||||
|
||||
/// Implementations of stages.
|
||||
pub mod stages;
|
||||
|
||||
pub mod sets;
|
||||
|
||||
pub use db::Transaction;
|
||||
pub use error::*;
|
||||
pub use id::*;
|
||||
|
||||
69
crates/stages/src/pipeline/builder.rs
Normal file
69
crates/stages/src/pipeline/builder.rs
Normal file
@ -0,0 +1,69 @@
|
||||
use crate::{pipeline::QueuedStage, Pipeline, Stage, StageSet};
|
||||
use reth_db::database::Database;
|
||||
use reth_interfaces::sync::{NoopSyncStateUpdate, SyncStateUpdater};
|
||||
use reth_primitives::BlockNumber;
|
||||
|
||||
/// Builds a [`Pipeline`].
|
||||
#[derive(Debug)]
|
||||
#[must_use = "call `build` to construct the pipeline"]
|
||||
pub struct PipelineBuilder<DB, U = NoopSyncStateUpdate>
|
||||
where
|
||||
DB: Database,
|
||||
U: SyncStateUpdater,
|
||||
{
|
||||
pipeline: Pipeline<DB, U>,
|
||||
}
|
||||
|
||||
impl<DB: Database, U: SyncStateUpdater> Default for PipelineBuilder<DB, U> {
|
||||
fn default() -> Self {
|
||||
Self { pipeline: Pipeline::default() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB, U> PipelineBuilder<DB, U>
|
||||
where
|
||||
DB: Database,
|
||||
U: SyncStateUpdater,
|
||||
{
|
||||
/// Add a stage to the pipeline.
|
||||
pub fn add_stage<S>(mut self, stage: S) -> Self
|
||||
where
|
||||
S: Stage<DB> + 'static,
|
||||
{
|
||||
self.pipeline.stages.push(QueuedStage { stage: Box::new(stage) });
|
||||
self
|
||||
}
|
||||
|
||||
/// Add a set of stages to the pipeline.
|
||||
///
|
||||
/// Stages can be grouped into a set by using a [`StageSet`].
|
||||
///
|
||||
/// To customize the stages in the set (reorder, disable, insert a stage) call
|
||||
/// [`build`][StageSet::build] on the set which will convert it to a
|
||||
/// [`StageSetBuilder`][crate::StageSetBuilder].
|
||||
pub fn add_stages<Set: StageSet<DB>>(mut self, set: Set) -> Self {
|
||||
for stage in set.builder().build() {
|
||||
self.pipeline.stages.push(QueuedStage { stage });
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the target block.
|
||||
///
|
||||
/// Once this block is reached, the pipeline will stop.
|
||||
pub fn with_max_block(mut self, block: BlockNumber) -> Self {
|
||||
self.pipeline.max_block = Some(block);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a [SyncStateUpdater].
|
||||
pub fn with_sync_state_updater(mut self, updater: U) -> Self {
|
||||
self.pipeline.sync_state_updater = Some(updater);
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds the final [`Pipeline`].
|
||||
pub fn build(self) -> Pipeline<DB, U> {
|
||||
self.pipeline
|
||||
}
|
||||
}
|
||||
@ -3,6 +3,8 @@ use crate::{
|
||||
stage::{ExecOutput, UnwindInput, UnwindOutput},
|
||||
};
|
||||
use reth_primitives::BlockNumber;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
/// An event emitted by a [Pipeline][crate::Pipeline].
|
||||
///
|
||||
@ -56,3 +58,27 @@ pub enum PipelineEvent {
|
||||
stage_id: StageId,
|
||||
},
|
||||
}
|
||||
|
||||
/// Bundles all listeners for [`PipelineEvent`]s
|
||||
// TODO: Make this a generic utility since the same struct exists in `reth/crates/net/network/src/manager.rs` and sort of in `https://github.com/paradigmxyz/reth/blob/01cb6c07df3205ee2bb55853d39302a7dfefc912/crates/net/discv4/src/lib.rs#L662-L671`
|
||||
#[derive(Default, Clone, Debug)]
|
||||
pub(crate) struct PipelineEventListeners {
|
||||
/// All listeners for events
|
||||
listeners: Vec<mpsc::UnboundedSender<PipelineEvent>>,
|
||||
}
|
||||
|
||||
impl PipelineEventListeners {
|
||||
/// Send an event to all listeners.
|
||||
///
|
||||
/// Channels that were closed are removed.
|
||||
pub(crate) fn notify(&mut self, event: PipelineEvent) {
|
||||
self.listeners.retain(|listener| listener.send(event.clone()).is_ok())
|
||||
}
|
||||
|
||||
/// Add a new event listener.
|
||||
pub(crate) fn new_listener(&mut self) -> UnboundedReceiverStream<PipelineEvent> {
|
||||
let (sender, receiver) = mpsc::unbounded_channel();
|
||||
self.listeners.push(sender);
|
||||
UnboundedReceiverStream::new(receiver)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
use crate::{
|
||||
db::Transaction, error::*, util::opt::MaybeSender, ExecInput, ExecOutput, Stage, StageError,
|
||||
StageId, UnwindInput,
|
||||
db::Transaction, error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
|
||||
};
|
||||
use reth_db::database::Database;
|
||||
use reth_interfaces::sync::{SyncState, SyncStateUpdater};
|
||||
@ -10,15 +9,19 @@ use std::{
|
||||
ops::Deref,
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tracing::*;
|
||||
|
||||
mod builder;
|
||||
mod ctrl;
|
||||
mod event;
|
||||
mod set;
|
||||
mod state;
|
||||
|
||||
pub use builder::*;
|
||||
use ctrl::*;
|
||||
pub use event::*;
|
||||
pub use set::*;
|
||||
use state::*;
|
||||
|
||||
#[cfg_attr(doc, aquamarine::aquamarine)]
|
||||
@ -74,7 +77,7 @@ use state::*;
|
||||
pub struct Pipeline<DB: Database, U: SyncStateUpdater> {
|
||||
stages: Vec<QueuedStage<DB>>,
|
||||
max_block: Option<BlockNumber>,
|
||||
events_sender: MaybeSender<PipelineEvent>,
|
||||
listeners: PipelineEventListeners,
|
||||
sync_state_updater: Option<U>,
|
||||
}
|
||||
|
||||
@ -83,7 +86,7 @@ impl<DB: Database, U: SyncStateUpdater> Default for Pipeline<DB, U> {
|
||||
Self {
|
||||
stages: Vec::new(),
|
||||
max_block: None,
|
||||
events_sender: MaybeSender::new(None),
|
||||
listeners: PipelineEventListeners::default(),
|
||||
sync_state_updater: None,
|
||||
}
|
||||
}
|
||||
@ -91,38 +94,25 @@ impl<DB: Database, U: SyncStateUpdater> Default for Pipeline<DB, U> {
|
||||
|
||||
impl<DB: Database, U: SyncStateUpdater> Debug for Pipeline<DB, U> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Pipeline").field("max_block", &self.max_block).finish()
|
||||
f.debug_struct("Pipeline")
|
||||
.field(
|
||||
"stages",
|
||||
&self.stages.iter().map(|stage| stage.stage.id()).collect::<Vec<StageId>>(),
|
||||
)
|
||||
.field("max_block", &self.max_block)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
|
||||
/// Add a stage to the pipeline.
|
||||
pub fn push<S>(mut self, stage: S) -> Self
|
||||
where
|
||||
S: Stage<DB> + 'static,
|
||||
{
|
||||
self.stages.push(QueuedStage { stage: Box::new(stage) });
|
||||
self
|
||||
/// Construct a pipeline using a [`PipelineBuilder`].
|
||||
pub fn builder() -> PipelineBuilder<DB, U> {
|
||||
PipelineBuilder::default()
|
||||
}
|
||||
|
||||
/// Set the target block.
|
||||
///
|
||||
/// Once this block is reached, syncing will stop.
|
||||
pub fn with_max_block(mut self, block: Option<BlockNumber>) -> Self {
|
||||
self.max_block = block;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a channel the pipeline will transmit events over (see [PipelineEvent]).
|
||||
pub fn with_channel(mut self, sender: Sender<PipelineEvent>) -> Self {
|
||||
self.events_sender.set(Some(sender));
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a [SyncStateUpdater].
|
||||
pub fn with_sync_state_updater(mut self, updater: U) -> Self {
|
||||
self.sync_state_updater = Some(updater);
|
||||
self
|
||||
/// Listen for events on the pipeline.
|
||||
pub fn events(&mut self) -> UnboundedReceiverStream<PipelineEvent> {
|
||||
self.listeners.new_listener()
|
||||
}
|
||||
|
||||
/// Run the pipeline in an infinite loop. Will terminate early if the user has specified
|
||||
@ -130,10 +120,9 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
|
||||
pub async fn run(&mut self, db: Arc<DB>) -> Result<(), PipelineError> {
|
||||
loop {
|
||||
let mut state = PipelineState {
|
||||
events_sender: self.events_sender.clone(),
|
||||
listeners: self.listeners.clone(),
|
||||
max_block: self.max_block,
|
||||
maximum_progress: None,
|
||||
minimum_progress: None,
|
||||
..Default::default()
|
||||
};
|
||||
let next_action = self.run_loop(&mut state, db.as_ref()).await?;
|
||||
|
||||
@ -224,14 +213,14 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
|
||||
let mut stage_progress = stage_id.get_progress(tx.deref())?.unwrap_or_default();
|
||||
if stage_progress < to {
|
||||
debug!(from = %stage_progress, %to, "Unwind point too far for stage");
|
||||
self.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
|
||||
self.listeners.notify(PipelineEvent::Skipped { stage_id });
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
debug!(from = %stage_progress, %to, ?bad_block, "Starting unwind");
|
||||
while stage_progress > to {
|
||||
let input = UnwindInput { stage_progress, unwind_to: to, bad_block };
|
||||
self.events_sender.send(PipelineEvent::Unwinding { stage_id, input }).await?;
|
||||
self.listeners.notify(PipelineEvent::Unwinding { stage_id, input });
|
||||
|
||||
let output = stage.unwind(&mut tx, input).await;
|
||||
match output {
|
||||
@ -239,12 +228,11 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
|
||||
stage_progress = unwind_output.stage_progress;
|
||||
stage_id.save_progress(tx.deref(), stage_progress)?;
|
||||
|
||||
self.events_sender
|
||||
.send(PipelineEvent::Unwound { stage_id, result: unwind_output })
|
||||
.await?;
|
||||
self.listeners
|
||||
.notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
|
||||
}
|
||||
Err(err) => {
|
||||
self.events_sender.send(PipelineEvent::Error { stage_id }).await?;
|
||||
self.listeners.notify(PipelineEvent::Error { stage_id });
|
||||
return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
|
||||
}
|
||||
}
|
||||
@ -314,16 +302,15 @@ impl<DB: Database> QueuedStage<DB> {
|
||||
stage = %stage_id,
|
||||
"Stage reached maximum block, skipping."
|
||||
);
|
||||
state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
|
||||
state.listeners.notify(PipelineEvent::Skipped { stage_id });
|
||||
|
||||
// We reached the maximum block, so we skip the stage
|
||||
return Ok(ControlFlow::NoProgress)
|
||||
}
|
||||
|
||||
state
|
||||
.events_sender
|
||||
.send(PipelineEvent::Running { stage_id, stage_progress: prev_progress })
|
||||
.await?;
|
||||
.listeners
|
||||
.notify(PipelineEvent::Running { stage_id, stage_progress: prev_progress });
|
||||
|
||||
match self
|
||||
.stage
|
||||
@ -341,10 +328,7 @@ impl<DB: Database> QueuedStage<DB> {
|
||||
);
|
||||
stage_id.save_progress(tx.deref(), stage_progress)?;
|
||||
|
||||
state
|
||||
.events_sender
|
||||
.send(PipelineEvent::Ran { stage_id, result: out.clone() })
|
||||
.await?;
|
||||
state.listeners.notify(PipelineEvent::Ran { stage_id, result: out.clone() });
|
||||
|
||||
// TODO: Make the commit interval configurable
|
||||
tx.commit()?;
|
||||
@ -360,7 +344,7 @@ impl<DB: Database> QueuedStage<DB> {
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
state.events_sender.send(PipelineEvent::Error { stage_id }).await?;
|
||||
state.listeners.notify(PipelineEvent::Error { stage_id });
|
||||
|
||||
return if let StageError::Validation { block, error } = err {
|
||||
warn!(
|
||||
@ -405,38 +389,35 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::{StageId, UnwindOutput};
|
||||
use assert_matches::assert_matches;
|
||||
use reth_db::mdbx::{self, test_utils, Env, EnvKind, WriteMap};
|
||||
use reth_db::mdbx::{self, test_utils, EnvKind};
|
||||
use reth_interfaces::{consensus, sync::NoopSyncStateUpdate};
|
||||
use tokio::sync::mpsc::channel;
|
||||
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
|
||||
use tokio_stream::StreamExt;
|
||||
use utils::TestStage;
|
||||
|
||||
/// Runs a simple pipeline.
|
||||
#[tokio::test]
|
||||
async fn run_pipeline() {
|
||||
let (tx, rx) = channel(2);
|
||||
let db = test_utils::create_test_db(EnvKind::RW);
|
||||
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
|
||||
|
||||
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
|
||||
.add_stage(
|
||||
TestStage::new(StageId("A"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 20, done: true })),
|
||||
)
|
||||
.add_stage(
|
||||
TestStage::new(StageId("B"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
|
||||
)
|
||||
.with_max_block(10)
|
||||
.build();
|
||||
let events = pipeline.events();
|
||||
|
||||
// Run pipeline
|
||||
tokio::spawn(async move {
|
||||
Pipeline::<Env<WriteMap>, NoopSyncStateUpdate>::default()
|
||||
.with_channel(tx)
|
||||
.push(
|
||||
TestStage::new(StageId("A"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 20, done: true })),
|
||||
)
|
||||
.push(
|
||||
TestStage::new(StageId("B"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
|
||||
)
|
||||
.with_max_block(Some(10))
|
||||
.run(db)
|
||||
.await
|
||||
});
|
||||
tokio::spawn(async move { pipeline.run(db).await });
|
||||
|
||||
// Check that the stages were run in order
|
||||
assert_eq!(
|
||||
ReceiverStream::new(rx).collect::<Vec<PipelineEvent>>().await,
|
||||
events.collect::<Vec<PipelineEvent>>().await,
|
||||
vec![
|
||||
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
|
||||
PipelineEvent::Ran {
|
||||
@ -455,44 +436,58 @@ mod tests {
|
||||
/// Unwinds a simple pipeline.
|
||||
#[tokio::test]
|
||||
async fn unwind_pipeline() {
|
||||
let (tx, rx) = channel(2);
|
||||
let db = test_utils::create_test_db(EnvKind::RW);
|
||||
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
|
||||
|
||||
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
|
||||
.add_stage(
|
||||
TestStage::new(StageId("A"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 100, done: true }))
|
||||
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
|
||||
)
|
||||
.add_stage(
|
||||
TestStage::new(StageId("B"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
|
||||
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
|
||||
)
|
||||
.add_stage(
|
||||
TestStage::new(StageId("C"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 20, done: true }))
|
||||
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
|
||||
)
|
||||
.with_max_block(10)
|
||||
.build();
|
||||
let events = pipeline.events();
|
||||
|
||||
// Run pipeline
|
||||
tokio::spawn(async move {
|
||||
let mut pipeline = Pipeline::<Env<mdbx::WriteMap>, NoopSyncStateUpdate>::default()
|
||||
.push(
|
||||
TestStage::new(StageId("A"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 100, done: true }))
|
||||
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
|
||||
)
|
||||
.push(
|
||||
TestStage::new(StageId("B"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
|
||||
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
|
||||
)
|
||||
.push(
|
||||
TestStage::new(StageId("C"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 20, done: true }))
|
||||
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
|
||||
)
|
||||
.with_max_block(Some(10));
|
||||
|
||||
// Sync first
|
||||
pipeline.run(db.clone()).await.expect("Could not run pipeline");
|
||||
|
||||
// Unwind
|
||||
pipeline
|
||||
.with_channel(tx)
|
||||
.unwind(&db, 1, None)
|
||||
.await
|
||||
.expect("Could not unwind pipeline");
|
||||
pipeline.unwind(&db, 1, None).await.expect("Could not unwind pipeline");
|
||||
});
|
||||
|
||||
// Check that the stages were unwound in reverse order
|
||||
assert_eq!(
|
||||
ReceiverStream::new(rx).collect::<Vec<PipelineEvent>>().await,
|
||||
events.collect::<Vec<PipelineEvent>>().await,
|
||||
vec![
|
||||
// Executing
|
||||
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
|
||||
PipelineEvent::Ran {
|
||||
stage_id: StageId("A"),
|
||||
result: ExecOutput { stage_progress: 100, done: true },
|
||||
},
|
||||
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
|
||||
PipelineEvent::Ran {
|
||||
stage_id: StageId("B"),
|
||||
result: ExecOutput { stage_progress: 10, done: true },
|
||||
},
|
||||
PipelineEvent::Running { stage_id: StageId("C"), stage_progress: None },
|
||||
PipelineEvent::Ran {
|
||||
stage_id: StageId("C"),
|
||||
result: ExecOutput { stage_progress: 20, done: true },
|
||||
},
|
||||
// Unwinding
|
||||
PipelineEvent::Unwinding {
|
||||
stage_id: StageId("C"),
|
||||
input: UnwindInput { stage_progress: 20, unwind_to: 1, bad_block: None }
|
||||
@ -535,37 +530,36 @@ mod tests {
|
||||
/// - The pipeline finishes
|
||||
#[tokio::test]
|
||||
async fn run_pipeline_with_unwind() {
|
||||
let (tx, rx) = channel(2);
|
||||
let db = test_utils::create_test_db(EnvKind::RW);
|
||||
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
|
||||
|
||||
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
|
||||
.add_stage(
|
||||
TestStage::new(StageId("A"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
|
||||
.add_unwind(Ok(UnwindOutput { stage_progress: 0 }))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
|
||||
)
|
||||
.add_stage(
|
||||
TestStage::new(StageId("B"))
|
||||
.add_exec(Err(StageError::Validation {
|
||||
block: 5,
|
||||
error: consensus::Error::BaseFeeMissing,
|
||||
}))
|
||||
.add_unwind(Ok(UnwindOutput { stage_progress: 0 }))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
|
||||
)
|
||||
.with_max_block(10)
|
||||
.build();
|
||||
let events = pipeline.events();
|
||||
|
||||
// Run pipeline
|
||||
tokio::spawn(async move {
|
||||
Pipeline::<Env<mdbx::WriteMap>, NoopSyncStateUpdate>::default()
|
||||
.push(
|
||||
TestStage::new(StageId("A"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
|
||||
.add_unwind(Ok(UnwindOutput { stage_progress: 0 }))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
|
||||
)
|
||||
.push(
|
||||
TestStage::new(StageId("B"))
|
||||
.add_exec(Err(StageError::Validation {
|
||||
block: 5,
|
||||
error: consensus::Error::BaseFeeMissing,
|
||||
}))
|
||||
.add_unwind(Ok(UnwindOutput { stage_progress: 0 }))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
|
||||
)
|
||||
.with_max_block(Some(10))
|
||||
.with_channel(tx)
|
||||
.run(db)
|
||||
.await
|
||||
.expect("Could not run pipeline");
|
||||
pipeline.run(db).await.expect("Could not run pipeline");
|
||||
});
|
||||
|
||||
// Check that the stages were unwound in reverse order
|
||||
assert_eq!(
|
||||
ReceiverStream::new(rx).collect::<Vec<PipelineEvent>>().await,
|
||||
events.collect::<Vec<PipelineEvent>>().await,
|
||||
vec![
|
||||
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
|
||||
PipelineEvent::Ran {
|
||||
@ -600,26 +594,26 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn pipeline_error_handling() {
|
||||
// Non-fatal
|
||||
let db = test_utils::create_test_db(EnvKind::RW);
|
||||
let result = Pipeline::<Env<WriteMap>, NoopSyncStateUpdate>::default()
|
||||
.push(
|
||||
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
|
||||
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
|
||||
.add_stage(
|
||||
TestStage::new(StageId("NonFatal"))
|
||||
.add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
|
||||
)
|
||||
.with_max_block(Some(10))
|
||||
.run(db)
|
||||
.await;
|
||||
.with_max_block(10)
|
||||
.build();
|
||||
let result = pipeline.run(db).await;
|
||||
assert_matches!(result, Ok(()));
|
||||
|
||||
// Fatal
|
||||
let db = test_utils::create_test_db(EnvKind::RW);
|
||||
let result = Pipeline::<Env<WriteMap>, NoopSyncStateUpdate>::default()
|
||||
.push(TestStage::new(StageId("Fatal")).add_exec(Err(StageError::DatabaseIntegrity(
|
||||
DatabaseIntegrityError::BlockBody { number: 5 },
|
||||
))))
|
||||
.run(db)
|
||||
.await;
|
||||
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
|
||||
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
|
||||
.add_stage(TestStage::new(StageId("Fatal")).add_exec(Err(
|
||||
StageError::DatabaseIntegrity(DatabaseIntegrityError::BlockBody { number: 5 }),
|
||||
)))
|
||||
.build();
|
||||
let result = pipeline.run(db).await;
|
||||
assert_matches!(
|
||||
result,
|
||||
Err(PipelineError::Stage(StageError::DatabaseIntegrity(
|
||||
|
||||
207
crates/stages/src/pipeline/set.rs
Normal file
207
crates/stages/src/pipeline/set.rs
Normal file
@ -0,0 +1,207 @@
|
||||
use crate::{Stage, StageId};
|
||||
use reth_db::database::Database;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::{Debug, Formatter},
|
||||
};
|
||||
|
||||
/// Combines multiple [`Stage`]s into a single unit.
|
||||
///
|
||||
/// A [`StageSet`] is a logical chunk of stages that depend on each other. It is up to the
|
||||
/// individual stage sets to determine what kind of configuration they expose.
|
||||
///
|
||||
/// Individual stages in the set can be added, removed and overriden using [`StageSetBuilder`].
|
||||
pub trait StageSet<DB: Database>: Sized {
|
||||
/// Configures the stages in the set.
|
||||
fn builder(self) -> StageSetBuilder<DB>;
|
||||
|
||||
/// Overrides the given [`Stage`], if it is in this set.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the [`Stage`] is not in this set.
|
||||
fn set<S: Stage<DB> + 'static>(self, stage: S) -> StageSetBuilder<DB> {
|
||||
self.builder().set(stage)
|
||||
}
|
||||
}
|
||||
|
||||
struct StageEntry<DB> {
|
||||
stage: Box<dyn Stage<DB>>,
|
||||
enabled: bool,
|
||||
}
|
||||
|
||||
impl<DB: Database> Debug for StageEntry<DB> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("StageEntry")
|
||||
.field("stage", &self.stage.id())
|
||||
.field("enabled", &self.enabled)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to create and configure a [`StageSet`].
|
||||
///
|
||||
/// The builder provides ordering helpers to ensure that stages that depend on each other are added
|
||||
/// to the final sync pipeline before/after their dependencies.
|
||||
///
|
||||
/// Stages inside the set can be disabled, enabled, overriden and reordered.
|
||||
pub struct StageSetBuilder<DB> {
|
||||
stages: HashMap<StageId, StageEntry<DB>>,
|
||||
order: Vec<StageId>,
|
||||
}
|
||||
|
||||
impl<DB: Database> Default for StageSetBuilder<DB> {
|
||||
fn default() -> Self {
|
||||
Self { stages: HashMap::new(), order: Vec::new() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: Database> Debug for StageSetBuilder<DB> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("StageSetBuilder")
|
||||
.field("stages", &self.stages)
|
||||
.field("order", &self.order)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB> StageSetBuilder<DB>
|
||||
where
|
||||
DB: Database,
|
||||
{
|
||||
fn index_of(&self, stage_id: StageId) -> usize {
|
||||
let index = self.order.iter().position(|&id| id == stage_id);
|
||||
|
||||
index.unwrap_or_else(|| panic!("Stage does not exist in set: {stage_id}"))
|
||||
}
|
||||
|
||||
fn upsert_stage_state(&mut self, stage: Box<dyn Stage<DB>>, added_at_index: usize) {
|
||||
let stage_id = stage.id();
|
||||
if self.stages.insert(stage.id(), StageEntry { stage, enabled: true }).is_some() {
|
||||
if let Some(to_remove) = self
|
||||
.order
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(i, id)| *i != added_at_index && **id == stage_id)
|
||||
.map(|(i, _)| i)
|
||||
{
|
||||
self.order.remove(to_remove);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Overrides the given [`Stage`], if it is in this set.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the [`Stage`] is not in this set.
|
||||
pub fn set<S: Stage<DB> + 'static>(mut self, stage: S) -> Self {
|
||||
let entry = self
|
||||
.stages
|
||||
.get_mut(&stage.id())
|
||||
.unwrap_or_else(|| panic!("Stage does not exist in set: {}", stage.id()));
|
||||
entry.stage = Box::new(stage);
|
||||
self
|
||||
}
|
||||
|
||||
/// Adds the given [`Stage`] at the end of this set.
|
||||
///
|
||||
/// If the stage was already in the group, it is removed from its previous place.
|
||||
pub fn add_stage<S: Stage<DB> + 'static>(mut self, stage: S) -> Self {
|
||||
let target_index = self.order.len();
|
||||
self.order.push(stage.id());
|
||||
self.upsert_stage_state(Box::new(stage), target_index);
|
||||
self
|
||||
}
|
||||
|
||||
/// Adds the given [`StageSet`] to the end of this set.
|
||||
///
|
||||
/// If a stage is in both sets, it is removed from its previous place in this set. Because of
|
||||
/// this, it is advisable to merge sets first and re-order stages after if needed.
|
||||
pub fn add_set<Set: StageSet<DB>>(mut self, set: Set) -> Self {
|
||||
for stage in set.builder().build() {
|
||||
let target_index = self.order.len();
|
||||
self.order.push(stage.id());
|
||||
self.upsert_stage_state(stage, target_index);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Adds the given [`Stage`] before the stage with the given [`StageId`].
|
||||
///
|
||||
/// If the stage was already in the group, it is removed from its previous place.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the dependency stage is not in this set.
|
||||
pub fn add_before<S: Stage<DB> + 'static>(mut self, stage: S, before: StageId) -> Self {
|
||||
let target_index = self.index_of(before);
|
||||
self.order.insert(target_index, stage.id());
|
||||
self.upsert_stage_state(Box::new(stage), target_index);
|
||||
self
|
||||
}
|
||||
|
||||
/// Adds the given [`Stage`] after the stage with the given [`StageId`].
|
||||
///
|
||||
/// If the stage was already in the group, it is removed from its previous place.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the dependency stage is not in this set.
|
||||
pub fn add_after<S: Stage<DB> + 'static>(mut self, stage: S, after: StageId) -> Self {
|
||||
let target_index = self.index_of(after) + 1;
|
||||
self.order.insert(target_index, stage.id());
|
||||
self.upsert_stage_state(Box::new(stage), target_index);
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables the given stage.
|
||||
///
|
||||
/// All stages within a [`StageSet`] are enabled by default.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the stage is not in this set.
|
||||
pub fn enable(mut self, stage_id: StageId) -> Self {
|
||||
let mut entry =
|
||||
self.stages.get_mut(&stage_id).expect("Cannot enable a stage that is not in the set.");
|
||||
entry.enabled = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Disables the given stage.
|
||||
///
|
||||
/// The disabled [`Stage`] keeps its place in the set, so it can be used for ordering with
|
||||
/// [`add_before`] or [`add_after`], or it can be re-enabled.
|
||||
///
|
||||
/// All stages within a [`StageSet`] are enabled by default.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the stage is not in this set.
|
||||
pub fn disable(mut self, stage_id: StageId) -> Self {
|
||||
let mut entry =
|
||||
self.stages.get_mut(&stage_id).expect("Cannot disable a stage that is not in the set.");
|
||||
entry.enabled = false;
|
||||
self
|
||||
}
|
||||
|
||||
/// Consumes the builder and returns the contained [`Stage`]s in the order specified.
|
||||
pub fn build(mut self) -> Vec<Box<dyn Stage<DB>>> {
|
||||
let mut stages = Vec::new();
|
||||
for id in &self.order {
|
||||
if let Some(entry) = self.stages.remove(id) {
|
||||
if entry.enabled {
|
||||
stages.push(entry.stage);
|
||||
}
|
||||
}
|
||||
}
|
||||
stages
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: Database> StageSet<DB> for StageSetBuilder<DB> {
|
||||
fn builder(self) -> StageSetBuilder<DB> {
|
||||
self
|
||||
}
|
||||
}
|
||||
@ -1,12 +1,10 @@
|
||||
use crate::{
|
||||
pipeline::event::PipelineEvent,
|
||||
util::{opt, opt::MaybeSender},
|
||||
};
|
||||
use crate::{pipeline::PipelineEventListeners, util::opt};
|
||||
use reth_primitives::BlockNumber;
|
||||
|
||||
/// The state of the pipeline during execution.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct PipelineState {
|
||||
pub(crate) events_sender: MaybeSender<PipelineEvent>,
|
||||
pub(crate) listeners: PipelineEventListeners,
|
||||
pub(crate) max_block: Option<BlockNumber>,
|
||||
/// The maximum progress achieved by any stage during the execution of the pipeline.
|
||||
pub(crate) maximum_progress: Option<BlockNumber>,
|
||||
@ -30,12 +28,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn record_progress_outliers() {
|
||||
let mut state = PipelineState {
|
||||
events_sender: MaybeSender::new(None),
|
||||
max_block: None,
|
||||
maximum_progress: None,
|
||||
minimum_progress: None,
|
||||
};
|
||||
let mut state = PipelineState::default();
|
||||
|
||||
state.record_progress_outliers(10);
|
||||
assert_eq!(state.minimum_progress, Some(10));
|
||||
|
||||
9
crates/stages/src/prelude.rs
Normal file
9
crates/stages/src/prelude.rs
Normal file
@ -0,0 +1,9 @@
|
||||
pub use crate::{
|
||||
error::{PipelineError, StageError},
|
||||
id::StageId,
|
||||
pipeline::{Pipeline, PipelineBuilder, PipelineEvent, StageSet, StageSetBuilder},
|
||||
sets::{
|
||||
DefaultStages, ExecutionStages, HashingStages, HistoryIndexingStages, OfflineStages,
|
||||
OnlineStages,
|
||||
},
|
||||
};
|
||||
186
crates/stages/src/sets.rs
Normal file
186
crates/stages/src/sets.rs
Normal file
@ -0,0 +1,186 @@
|
||||
//! Built-in [`StageSet`]s.
|
||||
//!
|
||||
//! The easiest set to use is [`DefaultStages`], which provides all stages required to run an
|
||||
//! instance of reth.
|
||||
//!
|
||||
//! It is also possible to run parts of reth standalone given the required data is present in
|
||||
//! the environment, such as [`ExecutionStages`] or [`HashingStages`].
|
||||
//!
|
||||
//!
|
||||
//! # Examples
|
||||
//!
|
||||
//! ```no_run
|
||||
//! # use reth_db::mdbx::{Env, WriteMap};
|
||||
//! # use reth_interfaces::sync::NoopSyncStateUpdate;
|
||||
//! # use reth_stages::Pipeline;
|
||||
//! # use reth_stages::sets::{OfflineStages};
|
||||
//! // Build a pipeline with all offline stages.
|
||||
//! # let pipeline: Pipeline<Env<WriteMap>, NoopSyncStateUpdate> =
|
||||
//! Pipeline::builder().add_stages(OfflineStages::default()).build();
|
||||
//! ```
|
||||
//!
|
||||
//! ```ignore
|
||||
//! # use reth_stages::Pipeline;
|
||||
//! # use reth_stages::{StageSet, sets::OfflineStages};
|
||||
//! // Build a pipeline with all offline stages and a custom stage at the end.
|
||||
//! Pipeline::builder()
|
||||
//! .add_stages(
|
||||
//! OfflineStages::default().builder().add_stage(MyCustomStage)
|
||||
//! )
|
||||
//! .build();
|
||||
//! ```
|
||||
use crate::{
|
||||
stages::{
|
||||
AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage,
|
||||
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
|
||||
TotalDifficultyStage, TransactionLookupStage,
|
||||
},
|
||||
StageSet, StageSetBuilder,
|
||||
};
|
||||
use reth_db::database::Database;
|
||||
use reth_interfaces::{
|
||||
consensus::Consensus,
|
||||
p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader},
|
||||
};
|
||||
use reth_primitives::ChainSpec;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// A set containing all stages to run a fully syncing instance of reth.
|
||||
///
|
||||
/// A combination of (in order)
|
||||
///
|
||||
/// - [`OnlineStages`]
|
||||
/// - [`OfflineStages`]
|
||||
#[derive(Debug)]
|
||||
pub struct DefaultStages<H, B> {
|
||||
/// Configuration for the online stages
|
||||
online: OnlineStages<H, B>,
|
||||
}
|
||||
|
||||
impl<H, B> DefaultStages<H, B> {
|
||||
/// Create a new set of default stages with default values.
|
||||
pub fn new(consensus: Arc<dyn Consensus>, header_downloader: H, body_downloader: B) -> Self {
|
||||
Self { online: OnlineStages::new(consensus, header_downloader, body_downloader) }
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB, H, B> StageSet<DB> for DefaultStages<H, B>
|
||||
where
|
||||
DB: Database,
|
||||
H: HeaderDownloader + 'static,
|
||||
B: BodyDownloader + 'static,
|
||||
{
|
||||
fn builder(self) -> StageSetBuilder<DB> {
|
||||
self.online.builder().add_set(OfflineStages)
|
||||
}
|
||||
}
|
||||
|
||||
/// A set containing all stages that require network access by default.
|
||||
///
|
||||
/// These stages *can* be run without network access if the specified downloaders are
|
||||
/// themselves offline.
|
||||
#[derive(Debug)]
|
||||
pub struct OnlineStages<H, B> {
|
||||
/// The consensus engine used to validate incoming data.
|
||||
consensus: Arc<dyn Consensus>,
|
||||
/// The block header downloader
|
||||
header_downloader: H,
|
||||
/// The block body downloader
|
||||
body_downloader: B,
|
||||
}
|
||||
|
||||
impl<H, B> OnlineStages<H, B> {
|
||||
/// Create a new set of online stages with default values.
|
||||
pub fn new(consensus: Arc<dyn Consensus>, header_downloader: H, body_downloader: B) -> Self {
|
||||
Self { consensus, header_downloader, body_downloader }
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB, H, B> StageSet<DB> for OnlineStages<H, B>
|
||||
where
|
||||
DB: Database,
|
||||
H: HeaderDownloader + 'static,
|
||||
B: BodyDownloader + 'static,
|
||||
{
|
||||
fn builder(self) -> StageSetBuilder<DB> {
|
||||
StageSetBuilder::default()
|
||||
.add_stage(HeaderStage::new(self.header_downloader, self.consensus.clone()))
|
||||
.add_stage(TotalDifficultyStage::default())
|
||||
.add_stage(BodyStage { downloader: self.body_downloader, consensus: self.consensus })
|
||||
.add_stage(TransactionLookupStage::default())
|
||||
}
|
||||
}
|
||||
|
||||
/// A set containing all stages that do not require network access.
|
||||
///
|
||||
/// A combination of (in order)
|
||||
///
|
||||
/// - [`ExecutionStages`]
|
||||
/// - [`HashingStages`]
|
||||
/// - [`HistoryIndexingStages`]
|
||||
#[derive(Debug, Default)]
|
||||
#[non_exhaustive]
|
||||
pub struct OfflineStages;
|
||||
|
||||
impl<DB: Database> StageSet<DB> for OfflineStages {
|
||||
fn builder(self) -> StageSetBuilder<DB> {
|
||||
ExecutionStages::default().builder().add_set(HashingStages).add_set(HistoryIndexingStages)
|
||||
}
|
||||
}
|
||||
|
||||
/// A set containing all stages that are required to execute pre-existing block data.
|
||||
#[derive(Debug)]
|
||||
#[non_exhaustive]
|
||||
pub struct ExecutionStages {
|
||||
/// The chain specification to use for execution.
|
||||
chain_spec: ChainSpec,
|
||||
}
|
||||
|
||||
impl Default for ExecutionStages {
|
||||
fn default() -> Self {
|
||||
Self { chain_spec: reth_primitives::MAINNET.clone() }
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecutionStages {
|
||||
/// Create a new set of execution stages with default values.
|
||||
pub fn new(chain_spec: ChainSpec) -> Self {
|
||||
Self { chain_spec }
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: Database> StageSet<DB> for ExecutionStages {
|
||||
fn builder(self) -> StageSetBuilder<DB> {
|
||||
StageSetBuilder::default()
|
||||
.add_stage(SenderRecoveryStage::default())
|
||||
.add_stage(ExecutionStage { chain_spec: self.chain_spec, ..Default::default() })
|
||||
}
|
||||
}
|
||||
|
||||
/// A set containing all stages that hash account state.
|
||||
#[derive(Debug, Default)]
|
||||
#[non_exhaustive]
|
||||
pub struct HashingStages;
|
||||
|
||||
impl<DB: Database> StageSet<DB> for HashingStages {
|
||||
fn builder(self) -> StageSetBuilder<DB> {
|
||||
StageSetBuilder::default()
|
||||
.add_stage(MerkleStage::Unwind)
|
||||
.add_stage(AccountHashingStage::default())
|
||||
.add_stage(StorageHashingStage::default())
|
||||
.add_stage(MerkleStage::Execution)
|
||||
}
|
||||
}
|
||||
|
||||
/// A set containing all stages that do additional indexing for historical state.
|
||||
#[derive(Debug, Default)]
|
||||
#[non_exhaustive]
|
||||
pub struct HistoryIndexingStages;
|
||||
|
||||
impl<DB: Database> StageSet<DB> for HistoryIndexingStages {
|
||||
fn builder(self) -> StageSetBuilder<DB> {
|
||||
StageSetBuilder::default()
|
||||
.add_stage(IndexStorageHistoryStage::default())
|
||||
.add_stage(IndexAccountHistoryStage::default())
|
||||
}
|
||||
}
|
||||
@ -17,7 +17,8 @@ use reth_interfaces::{
|
||||
use std::sync::Arc;
|
||||
use tracing::*;
|
||||
|
||||
pub(crate) const BODIES: StageId = StageId("Bodies");
|
||||
/// The [`StageId`] of the bodies downloader stage.
|
||||
pub const BODIES: StageId = StageId("Bodies");
|
||||
|
||||
// TODO(onbjerg): Metrics and events (gradual status for e.g. CLI)
|
||||
/// The body stage downloads block bodies.
|
||||
|
||||
@ -20,7 +20,8 @@ use reth_provider::LatestStateProviderRef;
|
||||
use std::fmt::Debug;
|
||||
use tracing::*;
|
||||
|
||||
const EXECUTION: StageId = StageId("Execution");
|
||||
/// The [`StageId`] of the execution stage.
|
||||
pub const EXECUTION: StageId = StageId("Execution");
|
||||
|
||||
/// The execution stage executes all transactions and
|
||||
/// update history indexes.
|
||||
|
||||
@ -14,20 +14,26 @@ use std::{
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
const ACCOUNT_HASHING: StageId = StageId("AccountHashingStage");
|
||||
/// The [`StageId`] of the account hashing stage.
|
||||
pub const ACCOUNT_HASHING: StageId = StageId("AccountHashingStage");
|
||||
|
||||
/// Account hashing stage hashes plain account.
|
||||
/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
|
||||
#[derive(Debug)]
|
||||
pub struct AccountHashingStage {
|
||||
/// The threshold for switching from incremental hashing
|
||||
/// of changes to whole storage hashing. Num of transitions.
|
||||
/// The threshold (in number of state transitions) for switching between incremental
|
||||
/// hashing and full storage hashing.
|
||||
pub clean_threshold: u64,
|
||||
/// The size of inserted items after which the control
|
||||
/// flow will be returned to the pipeline for commit.
|
||||
/// The maximum number of blocks to process before committing.
|
||||
pub commit_threshold: u64,
|
||||
}
|
||||
|
||||
impl Default for AccountHashingStage {
|
||||
fn default() -> Self {
|
||||
Self { clean_threshold: 500_000, commit_threshold: 100_000 }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<DB: Database> Stage<DB> for AccountHashingStage {
|
||||
/// Return the id of the stage
|
||||
|
||||
@ -12,20 +12,26 @@ use reth_primitives::{keccak256, Address, StorageEntry, H160, H256, U256};
|
||||
use std::{collections::BTreeMap, fmt::Debug};
|
||||
use tracing::*;
|
||||
|
||||
const STORAGE_HASHING: StageId = StageId("StorageHashingStage");
|
||||
/// The [`StageId`] of the storage hashing stage.
|
||||
pub const STORAGE_HASHING: StageId = StageId("StorageHashingStage");
|
||||
|
||||
/// Storage hashing stage hashes plain storage.
|
||||
/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
|
||||
#[derive(Debug)]
|
||||
pub struct StorageHashingStage {
|
||||
/// The threshold for switching from incremental hashing
|
||||
/// of changes to whole storage hashing. Num of transitions.
|
||||
/// The threshold (in number of state transitions) for switching between incremental
|
||||
/// hashing and full storage hashing.
|
||||
pub clean_threshold: u64,
|
||||
/// The size of inserted items after which the control
|
||||
/// flow will be returned to the pipeline for commit
|
||||
/// The maximum number of blocks to process before committing.
|
||||
pub commit_threshold: u64,
|
||||
}
|
||||
|
||||
impl Default for StorageHashingStage {
|
||||
fn default() -> Self {
|
||||
Self { clean_threshold: 500_000, commit_threshold: 100_000 }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<DB: Database> Stage<DB> for StorageHashingStage {
|
||||
/// Return the id of the stage
|
||||
|
||||
@ -12,16 +12,14 @@ use reth_db::{
|
||||
};
|
||||
use reth_interfaces::{
|
||||
consensus::{Consensus, ForkchoiceState},
|
||||
p2p::headers::{
|
||||
client::StatusUpdater,
|
||||
downloader::{HeaderDownloader, SyncTarget},
|
||||
},
|
||||
p2p::headers::downloader::{HeaderDownloader, SyncTarget},
|
||||
};
|
||||
use reth_primitives::{BlockNumber, Header, SealedHeader, U256};
|
||||
use reth_primitives::{BlockNumber, Header, SealedHeader};
|
||||
use std::sync::Arc;
|
||||
use tracing::*;
|
||||
|
||||
pub(crate) const HEADERS: StageId = StageId("Headers");
|
||||
/// The [`StageId`] of the headers downloader stage.
|
||||
pub const HEADERS: StageId = StageId("Headers");
|
||||
|
||||
/// The headers stage.
|
||||
///
|
||||
@ -37,34 +35,22 @@ pub(crate) const HEADERS: StageId = StageId("Headers");
|
||||
/// NOTE: This stage downloads headers in reverse. Upon returning the control flow to the pipeline,
|
||||
/// the stage progress is not updated unless this stage is done.
|
||||
#[derive(Debug)]
|
||||
pub struct HeaderStage<D: HeaderDownloader, S: StatusUpdater> {
|
||||
pub struct HeaderStage<D: HeaderDownloader> {
|
||||
/// Strategy for downloading the headers
|
||||
pub downloader: D,
|
||||
downloader: D,
|
||||
/// Consensus client implementation
|
||||
pub consensus: Arc<dyn Consensus>,
|
||||
/// Emits updates about the sync status
|
||||
pub sync_status_updates: S,
|
||||
consensus: Arc<dyn Consensus>,
|
||||
}
|
||||
|
||||
// === impl HeaderStage ===
|
||||
|
||||
impl<D, S> HeaderStage<D, S>
|
||||
impl<D> HeaderStage<D>
|
||||
where
|
||||
D: HeaderDownloader,
|
||||
S: StatusUpdater,
|
||||
{
|
||||
fn update_head<DB: Database>(
|
||||
&self,
|
||||
tx: &Transaction<'_, DB>,
|
||||
height: BlockNumber,
|
||||
) -> Result<(), StageError> {
|
||||
let block_key = tx.get_block_numhash(height)?;
|
||||
let td: U256 = *tx
|
||||
.get::<tables::HeaderTD>(block_key)?
|
||||
.ok_or(DatabaseIntegrityError::TotalDifficulty { number: height })?;
|
||||
// TODO: This should happen in the last stage
|
||||
self.sync_status_updates.update_status(height, block_key.hash(), td);
|
||||
Ok(())
|
||||
/// Create a new header stage
|
||||
pub fn new(downloader: D, consensus: Arc<dyn Consensus>) -> Self {
|
||||
Self { downloader, consensus }
|
||||
}
|
||||
|
||||
fn is_stage_done<DB: Database>(
|
||||
@ -175,11 +161,10 @@ where
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<DB, D, S> Stage<DB> for HeaderStage<D, S>
|
||||
impl<DB, D> Stage<DB> for HeaderStage<D>
|
||||
where
|
||||
DB: Database,
|
||||
D: HeaderDownloader,
|
||||
S: StatusUpdater,
|
||||
{
|
||||
/// Return the id of the stage
|
||||
fn id(&self) -> StageId {
|
||||
@ -194,7 +179,6 @@ where
|
||||
input: ExecInput,
|
||||
) -> Result<ExecOutput, StageError> {
|
||||
let current_progress = input.stage_progress.unwrap_or_default();
|
||||
self.update_head::<DB>(tx, current_progress)?;
|
||||
|
||||
// Lookup the head and tip of the sync range
|
||||
let gap = self.get_sync_gap(tx, current_progress).await?;
|
||||
@ -305,7 +289,7 @@ mod tests {
|
||||
p2p::headers::downloader::HeaderDownloader,
|
||||
test_utils::{
|
||||
generators::{random_header, random_header_range},
|
||||
TestConsensus, TestHeaderDownloader, TestHeadersClient, TestStatusUpdater,
|
||||
TestConsensus, TestHeaderDownloader, TestHeadersClient,
|
||||
},
|
||||
};
|
||||
use reth_primitives::{BlockNumber, SealedHeader, U256};
|
||||
@ -315,7 +299,6 @@ mod tests {
|
||||
pub(crate) consensus: Arc<TestConsensus>,
|
||||
pub(crate) client: Arc<TestHeadersClient>,
|
||||
downloader_factory: Box<dyn Fn() -> D + Send + Sync + 'static>,
|
||||
network_handle: TestStatusUpdater,
|
||||
tx: TestTransaction,
|
||||
}
|
||||
|
||||
@ -329,14 +312,13 @@ mod tests {
|
||||
downloader_factory: Box::new(move || {
|
||||
TestHeaderDownloader::new(client.clone(), consensus.clone(), 1000, 1000)
|
||||
}),
|
||||
network_handle: TestStatusUpdater::default(),
|
||||
tx: TestTransaction::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: HeaderDownloader + 'static> StageTestRunner for HeadersTestRunner<D> {
|
||||
type S = HeaderStage<D, TestStatusUpdater>;
|
||||
type S = HeaderStage<D>;
|
||||
|
||||
fn tx(&self) -> &TestTransaction {
|
||||
&self.tx
|
||||
@ -346,7 +328,6 @@ mod tests {
|
||||
HeaderStage {
|
||||
consensus: self.consensus.clone(),
|
||||
downloader: (*self.downloader_factory)(),
|
||||
sync_status_updates: self.network_handle.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -447,7 +428,6 @@ mod tests {
|
||||
.stream_batch_size(500)
|
||||
.build(consensus.clone(), client.clone())
|
||||
}),
|
||||
network_handle: TestStatusUpdater::default(),
|
||||
tx: TestTransaction::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,28 +5,48 @@ use reth_db::database::Database;
|
||||
use std::fmt::Debug;
|
||||
use tracing::*;
|
||||
|
||||
const MERKLE_EXECUTION: StageId = StageId("MerkleExecuteStage");
|
||||
const MERKLE_UNWIND: StageId = StageId("MerkleUnwindStage");
|
||||
/// The [`StageId`] of the merkle hashing execution stage.
|
||||
pub const MERKLE_EXECUTION: StageId = StageId("MerkleExecuteStage");
|
||||
|
||||
/// Merkle stage uses input from [AccountHashingStage] and [StorageHashingStage] stages
|
||||
/// and calculated intermediate hashed and state root.
|
||||
/// This stage depends on the Account and Storage stages. It will be executed after them during
|
||||
/// execution, and before them during unwinding.
|
||||
/// The [`StageId`] of the merkle hashing unwind stage.
|
||||
pub const MERKLE_UNWIND: StageId = StageId("MerkleUnwindStage");
|
||||
|
||||
/// The merkle hashing stage uses input from
|
||||
/// [`AccountHashingStage`][crate::stages::AccountHashingStage] and
|
||||
/// [`StorageHashingStage`][crate::stages::AccountHashingStage] to calculate intermediate hashes
|
||||
/// and state roots.
|
||||
///
|
||||
/// This stage should be run with the above two stages, otherwise it is a no-op.
|
||||
///
|
||||
/// This stage is split in two: one for calculating hashes and one for unwinding.
|
||||
///
|
||||
/// When run in execution, it's going to be executed AFTER the hashing stages, to generate
|
||||
/// the state root. When run in unwind mode, it's going to be executed BEFORE the hashing stages,
|
||||
/// so that it unwinds the intermediate hashes based on the unwound hashed state from the hashing
|
||||
/// stages. The order of these two variants is important. The unwind variant should be added to the
|
||||
/// pipeline before the execution variant.
|
||||
///
|
||||
/// An example pipeline to only hash state would be:
|
||||
///
|
||||
/// - [`MerkleStage::Unwind`]
|
||||
/// - [`AccountHashingStage`][crate::stages::AccountHashingStage]
|
||||
/// - [`StorageHashingStage`][crate::stages::StorageHashingStage]
|
||||
/// - [`MerkleStage::Execution`]
|
||||
#[derive(Debug)]
|
||||
pub struct MerkleStage {
|
||||
/// Flag if true would do `execute` but skip unwind but if it false it would skip execution but
|
||||
/// do unwind.
|
||||
pub is_execute: bool,
|
||||
pub enum MerkleStage {
|
||||
/// The execution portion of the hashing stage.
|
||||
Execution,
|
||||
/// The unwind portion of the hasing stage.
|
||||
Unwind,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<DB: Database> Stage<DB> for MerkleStage {
|
||||
/// Return the id of the stage
|
||||
fn id(&self) -> StageId {
|
||||
if self.is_execute {
|
||||
MERKLE_EXECUTION
|
||||
} else {
|
||||
MERKLE_UNWIND
|
||||
match self {
|
||||
MerkleStage::Execution => MERKLE_EXECUTION,
|
||||
MerkleStage::Unwind => MERKLE_UNWIND,
|
||||
}
|
||||
}
|
||||
|
||||
@ -36,7 +56,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
|
||||
_tx: &mut Transaction<'_, DB>,
|
||||
input: ExecInput,
|
||||
) -> Result<ExecOutput, StageError> {
|
||||
if !self.is_execute {
|
||||
if matches!(self, MerkleStage::Unwind) {
|
||||
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
|
||||
return Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true })
|
||||
}
|
||||
@ -53,7 +73,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
|
||||
_tx: &mut Transaction<'_, DB>,
|
||||
input: UnwindInput,
|
||||
) -> Result<UnwindOutput, StageError> {
|
||||
if self.is_execute {
|
||||
if matches!(self, MerkleStage::Execution) {
|
||||
info!(target: "sync::stages::merkle::exec", "Stage is always skipped");
|
||||
return Ok(UnwindOutput { stage_progress: input.unwind_to })
|
||||
}
|
||||
|
||||
@ -1,22 +1,34 @@
|
||||
/// The bodies stage.
|
||||
pub mod bodies;
|
||||
mod bodies;
|
||||
/// The execution stage that generates state diff.
|
||||
pub mod execution;
|
||||
mod execution;
|
||||
/// Account hashing stage.
|
||||
pub mod hashing_account;
|
||||
mod hashing_account;
|
||||
/// Storage hashing stage.
|
||||
pub mod hashing_storage;
|
||||
mod hashing_storage;
|
||||
/// The headers stage.
|
||||
pub mod headers;
|
||||
/// Intex history of account changes
|
||||
pub mod index_account_history;
|
||||
mod headers;
|
||||
/// Index history of account changes
|
||||
mod index_account_history;
|
||||
/// Index history of storage changes
|
||||
pub mod index_storage_history;
|
||||
mod index_storage_history;
|
||||
/// Intermediate hashes and creating merkle root
|
||||
pub mod merkle;
|
||||
mod merkle;
|
||||
/// The sender recovery stage.
|
||||
pub mod sender_recovery;
|
||||
mod sender_recovery;
|
||||
/// The total difficulty stage
|
||||
pub mod total_difficulty;
|
||||
mod total_difficulty;
|
||||
/// The transaction lookup stage
|
||||
pub mod tx_lookup;
|
||||
mod tx_lookup;
|
||||
|
||||
pub use bodies::*;
|
||||
pub use execution::*;
|
||||
pub use hashing_account::*;
|
||||
pub use hashing_storage::*;
|
||||
pub use headers::*;
|
||||
pub use index_account_history::*;
|
||||
pub use index_storage_history::*;
|
||||
pub use merkle::*;
|
||||
pub use sender_recovery::*;
|
||||
pub use total_difficulty::*;
|
||||
pub use tx_lookup::*;
|
||||
|
||||
@ -30,6 +30,12 @@ pub struct SenderRecoveryStage {
|
||||
pub commit_threshold: u64,
|
||||
}
|
||||
|
||||
impl Default for SenderRecoveryStage {
|
||||
fn default() -> Self {
|
||||
Self { batch_size: 1000, commit_threshold: 5000 }
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(onbjerg): Should unwind
|
||||
#[derive(Error, Debug)]
|
||||
enum SenderRecoveryStageError {
|
||||
|
||||
@ -24,6 +24,12 @@ pub struct TotalDifficultyStage {
|
||||
pub commit_threshold: u64,
|
||||
}
|
||||
|
||||
impl Default for TotalDifficultyStage {
|
||||
fn default() -> Self {
|
||||
Self { commit_threshold: 100_000 }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<DB: Database> Stage<DB> for TotalDifficultyStage {
|
||||
/// Return the id of the stage
|
||||
|
||||
@ -23,6 +23,12 @@ pub struct TransactionLookupStage {
|
||||
commit_threshold: u64,
|
||||
}
|
||||
|
||||
impl Default for TransactionLookupStage {
|
||||
fn default() -> Self {
|
||||
Self { commit_threshold: 50_000 }
|
||||
}
|
||||
}
|
||||
|
||||
impl TransactionLookupStage {
|
||||
/// Create new instance of [TransactionLookupStage].
|
||||
pub fn new(commit_threshold: u64) -> Self {
|
||||
|
||||
@ -1,6 +1,4 @@
|
||||
pub(crate) mod opt {
|
||||
use tokio::sync::mpsc::{error::SendError, Sender};
|
||||
|
||||
/// Get an [Option] with the maximum value, compared between the passed in value and the inner
|
||||
/// value of the [Option]. If the [Option] is `None`, then an option containing the passed in
|
||||
/// value will be returned.
|
||||
@ -15,33 +13,6 @@ pub(crate) mod opt {
|
||||
a.map_or(Some(b), |v| Some(std::cmp::min(v, b)))
|
||||
}
|
||||
|
||||
/// The producing side of a [tokio::mpsc] channel that may or may not be set.
|
||||
#[derive(Default, Clone)]
|
||||
pub(crate) struct MaybeSender<T> {
|
||||
inner: Option<Sender<T>>,
|
||||
}
|
||||
|
||||
impl<T> MaybeSender<T> {
|
||||
/// Create a new [MaybeSender]
|
||||
pub(crate) fn new(sender: Option<Sender<T>>) -> Self {
|
||||
Self { inner: sender }
|
||||
}
|
||||
|
||||
/// Send a value over the channel if an internal sender has been set.
|
||||
pub(crate) async fn send(&self, value: T) -> Result<(), SendError<T>> {
|
||||
if let Some(rx) = &self.inner {
|
||||
rx.send(value).await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Set or unset the internal sender.
|
||||
pub(crate) fn set(&mut self, sender: Option<Sender<T>>) {
|
||||
self.inner = sender;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
Reference in New Issue
Block a user