From 6868dda544bbb632f0fb84518da732c10f401286 Mon Sep 17 00:00:00 2001 From: Bjerg Date: Mon, 3 Oct 2022 14:40:50 +0200 Subject: [PATCH] feat: basic staged sync crate (#6) * feat: basic staged sync crate * refactor: rename to `reth-stages` * feat: remove eyre * feat: impl `Error` for `StageError` * chore: bump cargo lock * chore: nits * docs: clarify unwind Co-authored-by: Georgios Konstantopoulos --- Cargo.lock | 46 ++++++++++----- Cargo.toml | 3 +- crates/stages/Cargo.toml | 13 +++++ crates/stages/src/lib.rs | 107 ++++++++++++++++++++++++++++++++++ crates/stages/src/pipeline.rs | 103 ++++++++++++++++++++++++++++++++ 5 files changed, 258 insertions(+), 14 deletions(-) create mode 100644 crates/stages/Cargo.toml create mode 100644 crates/stages/src/lib.rs create mode 100644 crates/stages/src/pipeline.rs diff --git a/Cargo.lock b/Cargo.lock index ee140b7b4..d8258af25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,17 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" +[[package]] +name = "async-trait" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "auto_impl" version = "1.0.1" @@ -171,9 +182,9 @@ dependencies = [ [[package]] name = "ecdsa" -version = "0.14.7" +version = "0.14.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85789ce7dfbd0f0624c07ef653a08bb2ebf43d3e16531361f46d36dd54334fed" +checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" dependencies = [ "der", "elliptic-curve", @@ -247,7 +258,7 @@ dependencies = [ [[package]] name = "ethers-core" version = "0.17.0" -source = "git+https://github.com/gakonst/ethers-rs#d8791482d566e2203ab6a178524f1ed6705fe274" +source = "git+https://github.com/gakonst/ethers-rs#b2fc9fdf50d6fe3e81de0ac5648a068425cf87a7" dependencies = [ "arrayvec", "bytes", @@ -422,9 +433,9 @@ checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" [[package]] name = "k256" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3636d281d46c3b64182eb3a0a42b7b483191a2ecc3f05301fa67403f7c9bc949" +checksum = "72c1e0b51e7ec0a97369623508396067a486bd0cbed95a2659a4b863d28cfc8b" dependencies = [ "cfg-if", "ecdsa", @@ -441,9 +452,9 @@ checksum = "f9b7d56ba4a8344d6be9729995e6b06f928af29998cdf79fe390cbf6b1fee838" [[package]] name = "libc" -version = "0.2.133" +version = "0.2.134" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0f80d65747a3e43d1596c7c5492d95d5edddaabd45a7fcdb02b95f644164966" +checksum = "329c933548736bc49fd575ee68c89e8be4d260064184389a5b77517cddd99ffb" [[package]] name = "memchr" @@ -568,9 +579,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.44" +version = "1.0.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bd7356a8122b6c4a24a82b278680c73357984ca2fc79a0f9fa6dea7dced7c58" +checksum = "94e2ef8dbfc347b10c094890f778ee2e36ca9bb4262e86dc99cd217e35f3470b" dependencies = [ "unicode-ident", ] @@ -656,6 +667,15 @@ dependencies = [ "ethers-core", ] +[[package]] +name = "reth-stages" +version = "0.1.0" +dependencies = [ + "async-trait", + "reth-primitives", + "thiserror", +] + [[package]] name = "rfc6979" version = "0.3.0" @@ -856,18 +876,18 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "thiserror" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a99cb8c4b9a8ef0e7907cd3b617cc8dc04d571c4e73c8ae403d80ac160bb122" +checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a891860d3c8d66fec8e73ddb3765f90082374dbaaa833407b904a94f1a7eb43" +checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index b22c2a824..506f1e54d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,8 @@ edition = "2021" members = [ "crate-template", "crates/primitives", - "crates/net/p2p" + "crates/net/p2p", + "crates/stages" ] [dependencies] diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml new file mode 100644 index 000000000..e635d30fa --- /dev/null +++ b/crates/stages/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "reth-stages" +version = "0.1.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/foundry-rs/reth" +readme = "README.md" +description = "Staged syncing primitives used in reth." + +[dependencies] +reth-primitives = { path = "../primitives" } +async-trait = "0.1.57" +thiserror = "1.0.37" \ No newline at end of file diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs new file mode 100644 index 000000000..7edf87e08 --- /dev/null +++ b/crates/stages/src/lib.rs @@ -0,0 +1,107 @@ +#![warn(missing_debug_implementations, missing_docs, unreachable_pub)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] +//! Staged syncing primitives for reth. +//! +//! See [Stage] and [Pipeline]. + +use async_trait::async_trait; +use reth_primitives::U64; +use thiserror::Error; + +mod pipeline; +pub use pipeline::*; + +/// Stage execution input, see [Stage::execute]. +#[derive(Clone, Copy, Debug)] +pub struct ExecInput { + /// The stage that was run before the current stage and the block number it reached. + pub previous_stage: Option<(StageId, U64)>, + /// The progress of this stage the last time it was executed. + pub stage_progress: Option, +} + +/// Stage unwind input, see [Stage::unwind]. +#[derive(Clone, Copy, Debug)] +pub struct UnwindInput { + /// The current highest block of the stage. + pub stage_progress: U64, + /// The block to unwind to. + pub unwind_to: U64, + /// The bad block that caused the unwind, if any. + pub bad_block: Option, +} + +/// The output of a stage execution. +#[derive(Debug, PartialEq, Eq)] +pub struct ExecOutput { + /// How far the stage got. + pub stage_progress: U64, + /// Whether or not the stage is done. + pub done: bool, + /// Whether or not the stage reached the tip of the chain. + pub reached_tip: bool, +} + +/// The output of a stage unwinding. +#[derive(Debug, PartialEq, Eq)] +pub struct UnwindOutput { + /// The block at which the stage has unwound to. + pub stage_progress: U64, +} + +/// A stage execution error. +#[derive(Error, Debug)] +pub enum StageError { + /// The stage encountered a state validation error. + /// + /// TODO: This depends on the consensus engine and should include the validation failure reason + #[error("Stage encountered a validation error.")] + Validation, + /// The stage encountered an internal error. + #[error(transparent)] + Internal(Box), +} + +/// The ID of a stage. +/// +/// Each stage ID must be unique. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct StageId(pub &'static str); + +/// A stage is a segmented part of the syncing process of the node. +/// +/// Each stage takes care of a well-defined task, such as downloading headers or executing +/// transactions, and persist their results to a database. +/// +/// Stages must have a unique [ID][StageId] and implement a way to "roll forwards" +/// ([Stage::execute]) and a way to "roll back" ([Stage::unwind]). +/// +/// Stages are executed as part of a pipeline where they are executed serially. +#[async_trait] +pub trait Stage { + /// Get the ID of the stage. + /// + /// Stage IDs must be unique. + fn id(&self) -> StageId; + + /// Execute the stage. + async fn execute( + &mut self, + tx: &mut dyn DbTransaction, + input: ExecInput, + ) -> Result; + + /// Unwind the stage. + async fn unwind( + &mut self, + tx: &mut dyn DbTransaction, + input: UnwindInput, + ) -> Result>; +} + +/// TODO: Stand-in for database-related abstractions. +pub trait DbTransaction {} diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs new file mode 100644 index 000000000..7fb874f14 --- /dev/null +++ b/crates/stages/src/pipeline.rs @@ -0,0 +1,103 @@ +use crate::Stage; +use reth_primitives::U64; +use std::fmt::{Debug, Formatter}; + +#[allow(dead_code)] +struct QueuedStage { + /// The actual stage to execute. + stage: Box, + /// The unwind priority of the stage. + unwind_priority: usize, + /// Whether or not this stage can only execute when we reach what we believe to be the tip of + /// the chain. + require_tip: bool, +} + +/// A staged sync pipeline. +/// +/// The pipeline executes queued [stages][Stage] serially. An external component determines the tip +/// of the chain and the pipeline then executes each stage in order from the current local chain tip +/// and the external chain tip. When a stage is executed, it will run until it reaches the chain +/// tip. +/// +/// After the entire pipeline has been run, it will run again unless asked to stop (see +/// [Pipeline::set_exit_after_sync]). +/// +/// # Unwinding +/// +/// In case of a validation error (as determined by the consensus engine) in one of the stages, the +/// pipeline will unwind the stages according to their unwind priority. It is also possible to +/// request an unwind manually (see [Pipeline::start_with_unwind]). +/// +/// The unwind priority is set with [Pipeline::push_with_unwind_priority]. Stages with higher unwind +/// priorities are unwound first. +#[derive(Default)] +pub struct Pipeline { + stages: Vec, + unwind_to: Option, + max_block: Option, + exit_after_sync: bool, +} + +impl Debug for Pipeline { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Pipeline") + .field("unwind_to", &self.unwind_to) + .field("max_block", &self.max_block) + .field("exit_after_sync", &self.exit_after_sync) + .finish() + } +} + +impl Pipeline { + /// Add a stage to the pipeline. + /// + /// # Unwinding + /// + /// The unwind priority is set to 0. + pub fn push(&mut self, stage: S, require_tip: bool) -> &mut Self + where + S: Stage + 'static, + { + self.push_with_unwind_priority(stage, require_tip, 0) + } + + /// Add a stage to the pipeline, specifying the unwind priority. + pub fn push_with_unwind_priority( + &mut self, + stage: S, + require_tip: bool, + unwind_priority: usize, + ) -> &mut Self + where + S: Stage + 'static, + { + self.stages.push(QueuedStage { stage: Box::new(stage), require_tip, unwind_priority }); + self + } + + /// Set the target block. + /// + /// Once this block is reached, syncing will stop. + pub fn set_max_block(&mut self, block: Option) -> &mut Self { + self.max_block = block; + self + } + + /// Start the pipeline by unwinding to the specified block. + pub fn start_with_unwind(&mut self, unwind_to: Option) -> &mut Self { + self.unwind_to = unwind_to; + self + } + + /// Control whether the pipeline should exit after syncing. + pub fn set_exit_after_sync(&mut self, exit: bool) -> &mut Self { + self.exit_after_sync = exit; + self + } + + /// Run the pipeline. + pub async fn run(&mut self) -> Result<(), Box> { + todo!() + } +}