feat: basic staged sync crate (#6)

* feat: basic staged sync crate

* refactor: rename to `reth-stages`

* feat: remove eyre

* feat: impl `Error` for `StageError`

* chore: bump cargo lock

* chore: nits

* docs: clarify unwind

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Bjerg
2022-10-03 14:40:50 +02:00
committed by GitHub
parent 6ff7cf5a74
commit 6868dda544
5 changed files with 258 additions and 14 deletions

46
Cargo.lock generated
View File

@ -17,6 +17,17 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "async-trait"
version = "0.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "auto_impl" name = "auto_impl"
version = "1.0.1" version = "1.0.1"
@ -171,9 +182,9 @@ dependencies = [
[[package]] [[package]]
name = "ecdsa" name = "ecdsa"
version = "0.14.7" version = "0.14.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85789ce7dfbd0f0624c07ef653a08bb2ebf43d3e16531361f46d36dd54334fed" checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c"
dependencies = [ dependencies = [
"der", "der",
"elliptic-curve", "elliptic-curve",
@ -247,7 +258,7 @@ dependencies = [
[[package]] [[package]]
name = "ethers-core" name = "ethers-core"
version = "0.17.0" version = "0.17.0"
source = "git+https://github.com/gakonst/ethers-rs#d8791482d566e2203ab6a178524f1ed6705fe274" source = "git+https://github.com/gakonst/ethers-rs#b2fc9fdf50d6fe3e81de0ac5648a068425cf87a7"
dependencies = [ dependencies = [
"arrayvec", "arrayvec",
"bytes", "bytes",
@ -422,9 +433,9 @@ checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754"
[[package]] [[package]]
name = "k256" name = "k256"
version = "0.11.5" version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3636d281d46c3b64182eb3a0a42b7b483191a2ecc3f05301fa67403f7c9bc949" checksum = "72c1e0b51e7ec0a97369623508396067a486bd0cbed95a2659a4b863d28cfc8b"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"ecdsa", "ecdsa",
@ -441,9 +452,9 @@ checksum = "f9b7d56ba4a8344d6be9729995e6b06f928af29998cdf79fe390cbf6b1fee838"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.133" version = "0.2.134"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0f80d65747a3e43d1596c7c5492d95d5edddaabd45a7fcdb02b95f644164966" checksum = "329c933548736bc49fd575ee68c89e8be4d260064184389a5b77517cddd99ffb"
[[package]] [[package]]
name = "memchr" name = "memchr"
@ -568,9 +579,9 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.44" version = "1.0.46"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd7356a8122b6c4a24a82b278680c73357984ca2fc79a0f9fa6dea7dced7c58" checksum = "94e2ef8dbfc347b10c094890f778ee2e36ca9bb4262e86dc99cd217e35f3470b"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@ -656,6 +667,15 @@ dependencies = [
"ethers-core", "ethers-core",
] ]
[[package]]
name = "reth-stages"
version = "0.1.0"
dependencies = [
"async-trait",
"reth-primitives",
"thiserror",
]
[[package]] [[package]]
name = "rfc6979" name = "rfc6979"
version = "0.3.0" version = "0.3.0"
@ -856,18 +876,18 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.36" version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a99cb8c4b9a8ef0e7907cd3b617cc8dc04d571c4e73c8ae403d80ac160bb122" checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
dependencies = [ dependencies = [
"thiserror-impl", "thiserror-impl",
] ]
[[package]] [[package]]
name = "thiserror-impl" name = "thiserror-impl"
version = "1.0.36" version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a891860d3c8d66fec8e73ddb3765f90082374dbaaa833407b904a94f1a7eb43" checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@ -7,7 +7,8 @@ edition = "2021"
members = [ members = [
"crate-template", "crate-template",
"crates/primitives", "crates/primitives",
"crates/net/p2p" "crates/net/p2p",
"crates/stages"
] ]
[dependencies] [dependencies]

13
crates/stages/Cargo.toml Normal file
View File

@ -0,0 +1,13 @@
[package]
name = "reth-stages"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/foundry-rs/reth"
readme = "README.md"
description = "Staged syncing primitives used in reth."
[dependencies]
reth-primitives = { path = "../primitives" }
async-trait = "0.1.57"
thiserror = "1.0.37"

107
crates/stages/src/lib.rs Normal file
View File

@ -0,0 +1,107 @@
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Staged syncing primitives for reth.
//!
//! See [Stage] and [Pipeline].
use async_trait::async_trait;
use reth_primitives::U64;
use thiserror::Error;
mod pipeline;
pub use pipeline::*;
/// Stage execution input, see [Stage::execute].
#[derive(Clone, Copy, Debug)]
pub struct ExecInput {
/// The stage that was run before the current stage and the block number it reached.
pub previous_stage: Option<(StageId, U64)>,
/// The progress of this stage the last time it was executed.
pub stage_progress: Option<U64>,
}
/// Stage unwind input, see [Stage::unwind].
#[derive(Clone, Copy, Debug)]
pub struct UnwindInput {
/// The current highest block of the stage.
pub stage_progress: U64,
/// The block to unwind to.
pub unwind_to: U64,
/// The bad block that caused the unwind, if any.
pub bad_block: Option<U64>,
}
/// The output of a stage execution.
#[derive(Debug, PartialEq, Eq)]
pub struct ExecOutput {
/// How far the stage got.
pub stage_progress: U64,
/// Whether or not the stage is done.
pub done: bool,
/// Whether or not the stage reached the tip of the chain.
pub reached_tip: bool,
}
/// The output of a stage unwinding.
#[derive(Debug, PartialEq, Eq)]
pub struct UnwindOutput {
/// The block at which the stage has unwound to.
pub stage_progress: U64,
}
/// A stage execution error.
#[derive(Error, Debug)]
pub enum StageError {
/// The stage encountered a state validation error.
///
/// TODO: This depends on the consensus engine and should include the validation failure reason
#[error("Stage encountered a validation error.")]
Validation,
/// The stage encountered an internal error.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
}
/// The ID of a stage.
///
/// Each stage ID must be unique.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct StageId(pub &'static str);
/// A stage is a segmented part of the syncing process of the node.
///
/// Each stage takes care of a well-defined task, such as downloading headers or executing
/// transactions, and persist their results to a database.
///
/// Stages must have a unique [ID][StageId] and implement a way to "roll forwards"
/// ([Stage::execute]) and a way to "roll back" ([Stage::unwind]).
///
/// Stages are executed as part of a pipeline where they are executed serially.
#[async_trait]
pub trait Stage {
/// Get the ID of the stage.
///
/// Stage IDs must be unique.
fn id(&self) -> StageId;
/// Execute the stage.
async fn execute(
&mut self,
tx: &mut dyn DbTransaction,
input: ExecInput,
) -> Result<ExecOutput, StageError>;
/// Unwind the stage.
async fn unwind(
&mut self,
tx: &mut dyn DbTransaction,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>;
}
/// TODO: Stand-in for database-related abstractions.
pub trait DbTransaction {}

View File

@ -0,0 +1,103 @@
use crate::Stage;
use reth_primitives::U64;
use std::fmt::{Debug, Formatter};
#[allow(dead_code)]
struct QueuedStage {
/// The actual stage to execute.
stage: Box<dyn Stage>,
/// The unwind priority of the stage.
unwind_priority: usize,
/// Whether or not this stage can only execute when we reach what we believe to be the tip of
/// the chain.
require_tip: bool,
}
/// A staged sync pipeline.
///
/// The pipeline executes queued [stages][Stage] serially. An external component determines the tip
/// of the chain and the pipeline then executes each stage in order from the current local chain tip
/// and the external chain tip. When a stage is executed, it will run until it reaches the chain
/// tip.
///
/// After the entire pipeline has been run, it will run again unless asked to stop (see
/// [Pipeline::set_exit_after_sync]).
///
/// # Unwinding
///
/// In case of a validation error (as determined by the consensus engine) in one of the stages, the
/// pipeline will unwind the stages according to their unwind priority. It is also possible to
/// request an unwind manually (see [Pipeline::start_with_unwind]).
///
/// The unwind priority is set with [Pipeline::push_with_unwind_priority]. Stages with higher unwind
/// priorities are unwound first.
#[derive(Default)]
pub struct Pipeline {
stages: Vec<QueuedStage>,
unwind_to: Option<U64>,
max_block: Option<U64>,
exit_after_sync: bool,
}
impl Debug for Pipeline {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pipeline")
.field("unwind_to", &self.unwind_to)
.field("max_block", &self.max_block)
.field("exit_after_sync", &self.exit_after_sync)
.finish()
}
}
impl Pipeline {
/// Add a stage to the pipeline.
///
/// # Unwinding
///
/// The unwind priority is set to 0.
pub fn push<S>(&mut self, stage: S, require_tip: bool) -> &mut Self
where
S: Stage + 'static,
{
self.push_with_unwind_priority(stage, require_tip, 0)
}
/// Add a stage to the pipeline, specifying the unwind priority.
pub fn push_with_unwind_priority<S>(
&mut self,
stage: S,
require_tip: bool,
unwind_priority: usize,
) -> &mut Self
where
S: Stage + 'static,
{
self.stages.push(QueuedStage { stage: Box::new(stage), require_tip, unwind_priority });
self
}
/// Set the target block.
///
/// Once this block is reached, syncing will stop.
pub fn set_max_block(&mut self, block: Option<U64>) -> &mut Self {
self.max_block = block;
self
}
/// Start the pipeline by unwinding to the specified block.
pub fn start_with_unwind(&mut self, unwind_to: Option<U64>) -> &mut Self {
self.unwind_to = unwind_to;
self
}
/// Control whether the pipeline should exit after syncing.
pub fn set_exit_after_sync(&mut self, exit: bool) -> &mut Self {
self.exit_after_sync = exit;
self
}
/// Run the pipeline.
pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
todo!()
}
}