Merge branch 'master' into matt/rpc-support

This commit is contained in:
Matthias Seitz
2022-10-03 14:56:38 +02:00
5 changed files with 240 additions and 37 deletions

52
Cargo.lock generated
View File

@ -236,9 +236,9 @@ dependencies = [
[[package]]
name = "ecdsa"
version = "0.14.7"
version = "0.14.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85789ce7dfbd0f0624c07ef653a08bb2ebf43d3e16531361f46d36dd54334fed"
checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c"
dependencies = [
"der",
"elliptic-curve",
@ -312,7 +312,7 @@ dependencies = [
[[package]]
name = "ethers-core"
version = "0.17.0"
source = "git+https://github.com/gakonst/ethers-rs#d8791482d566e2203ab6a178524f1ed6705fe274"
source = "git+https://github.com/gakonst/ethers-rs#b2fc9fdf50d6fe3e81de0ac5648a068425cf87a7"
dependencies = [
"arrayvec",
"bytes",
@ -792,9 +792,9 @@ dependencies = [
[[package]]
name = "k256"
version = "0.11.5"
version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3636d281d46c3b64182eb3a0a42b7b483191a2ecc3f05301fa67403f7c9bc949"
checksum = "72c1e0b51e7ec0a97369623508396067a486bd0cbed95a2659a4b863d28cfc8b"
dependencies = [
"cfg-if",
"ecdsa",
@ -817,9 +817,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.133"
version = "0.2.134"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0f80d65747a3e43d1596c7c5492d95d5edddaabd45a7fcdb02b95f644164966"
checksum = "329c933548736bc49fd575ee68c89e8be4d260064184389a5b77517cddd99ffb"
[[package]]
name = "lock_api"
@ -1046,9 +1046,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.44"
version = "1.0.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd7356a8122b6c4a24a82b278680c73357984ca2fc79a0f9fa6dea7dced7c58"
checksum = "94e2ef8dbfc347b10c094890f778ee2e36ca9bb4262e86dc99cd217e35f3470b"
dependencies = [
"unicode-ident",
]
@ -1144,36 +1144,14 @@ dependencies = [
]
[[package]]
name = "reth-rpc"
name = "reth-stages"
version = "0.1.0"
dependencies = [
"async-trait",
"reth-primitives",
"reth-rpc-api",
"reth-rpc-types",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "reth-rpc-api"
version = "0.1.0"
dependencies = [
"jsonrpsee",
"reth-primitives",
"reth-rpc-types",
]
[[package]]
name = "reth-rpc-types"
version = "0.1.0"
dependencies = [
"fastrlp",
"reth-primitives",
"serde",
"serde_json",
]
[[package]]
name = "rfc6979"
version = "0.3.0"
@ -1439,18 +1417,18 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "thiserror"
version = "1.0.36"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a99cb8c4b9a8ef0e7907cd3b617cc8dc04d571c4e73c8ae403d80ac160bb122"
checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.36"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a891860d3c8d66fec8e73ddb3765f90082374dbaaa833407b904a94f1a7eb43"
checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
dependencies = [
"proc-macro2",
"quote",

View File

@ -11,6 +11,8 @@ members = [
"crates/net/rpc-api",
"crates/net/rpc-types",
"crates/primitives",
"crates/net/p2p",
"crates/stages"
]
[dependencies]

13
crates/stages/Cargo.toml Normal file
View File

@ -0,0 +1,13 @@
[package]
name = "reth-stages"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/foundry-rs/reth"
readme = "README.md"
description = "Staged syncing primitives used in reth."
[dependencies]
reth-primitives = { path = "../primitives" }
async-trait = "0.1.57"
thiserror = "1.0.37"

107
crates/stages/src/lib.rs Normal file
View File

@ -0,0 +1,107 @@
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Staged syncing primitives for reth.
//!
//! See [Stage] and [Pipeline].
use async_trait::async_trait;
use reth_primitives::U64;
use thiserror::Error;
mod pipeline;
pub use pipeline::*;
/// Stage execution input, see [Stage::execute].
#[derive(Clone, Copy, Debug)]
pub struct ExecInput {
/// The stage that was run before the current stage and the block number it reached.
pub previous_stage: Option<(StageId, U64)>,
/// The progress of this stage the last time it was executed.
pub stage_progress: Option<U64>,
}
/// Stage unwind input, see [Stage::unwind].
#[derive(Clone, Copy, Debug)]
pub struct UnwindInput {
/// The current highest block of the stage.
pub stage_progress: U64,
/// The block to unwind to.
pub unwind_to: U64,
/// The bad block that caused the unwind, if any.
pub bad_block: Option<U64>,
}
/// The output of a stage execution.
#[derive(Debug, PartialEq, Eq)]
pub struct ExecOutput {
/// How far the stage got.
pub stage_progress: U64,
/// Whether or not the stage is done.
pub done: bool,
/// Whether or not the stage reached the tip of the chain.
pub reached_tip: bool,
}
/// The output of a stage unwinding.
#[derive(Debug, PartialEq, Eq)]
pub struct UnwindOutput {
/// The block at which the stage has unwound to.
pub stage_progress: U64,
}
/// A stage execution error.
#[derive(Error, Debug)]
pub enum StageError {
/// The stage encountered a state validation error.
///
/// TODO: This depends on the consensus engine and should include the validation failure reason
#[error("Stage encountered a validation error.")]
Validation,
/// The stage encountered an internal error.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
}
/// The ID of a stage.
///
/// Each stage ID must be unique.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct StageId(pub &'static str);
/// A stage is a segmented part of the syncing process of the node.
///
/// Each stage takes care of a well-defined task, such as downloading headers or executing
/// transactions, and persist their results to a database.
///
/// Stages must have a unique [ID][StageId] and implement a way to "roll forwards"
/// ([Stage::execute]) and a way to "roll back" ([Stage::unwind]).
///
/// Stages are executed as part of a pipeline where they are executed serially.
#[async_trait]
pub trait Stage {
/// Get the ID of the stage.
///
/// Stage IDs must be unique.
fn id(&self) -> StageId;
/// Execute the stage.
async fn execute(
&mut self,
tx: &mut dyn DbTransaction,
input: ExecInput,
) -> Result<ExecOutput, StageError>;
/// Unwind the stage.
async fn unwind(
&mut self,
tx: &mut dyn DbTransaction,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>;
}
/// TODO: Stand-in for database-related abstractions.
pub trait DbTransaction {}

View File

@ -0,0 +1,103 @@
use crate::Stage;
use reth_primitives::U64;
use std::fmt::{Debug, Formatter};
#[allow(dead_code)]
struct QueuedStage {
/// The actual stage to execute.
stage: Box<dyn Stage>,
/// The unwind priority of the stage.
unwind_priority: usize,
/// Whether or not this stage can only execute when we reach what we believe to be the tip of
/// the chain.
require_tip: bool,
}
/// A staged sync pipeline.
///
/// The pipeline executes queued [stages][Stage] serially. An external component determines the tip
/// of the chain and the pipeline then executes each stage in order from the current local chain tip
/// and the external chain tip. When a stage is executed, it will run until it reaches the chain
/// tip.
///
/// After the entire pipeline has been run, it will run again unless asked to stop (see
/// [Pipeline::set_exit_after_sync]).
///
/// # Unwinding
///
/// In case of a validation error (as determined by the consensus engine) in one of the stages, the
/// pipeline will unwind the stages according to their unwind priority. It is also possible to
/// request an unwind manually (see [Pipeline::start_with_unwind]).
///
/// The unwind priority is set with [Pipeline::push_with_unwind_priority]. Stages with higher unwind
/// priorities are unwound first.
#[derive(Default)]
pub struct Pipeline {
stages: Vec<QueuedStage>,
unwind_to: Option<U64>,
max_block: Option<U64>,
exit_after_sync: bool,
}
impl Debug for Pipeline {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pipeline")
.field("unwind_to", &self.unwind_to)
.field("max_block", &self.max_block)
.field("exit_after_sync", &self.exit_after_sync)
.finish()
}
}
impl Pipeline {
/// Add a stage to the pipeline.
///
/// # Unwinding
///
/// The unwind priority is set to 0.
pub fn push<S>(&mut self, stage: S, require_tip: bool) -> &mut Self
where
S: Stage + 'static,
{
self.push_with_unwind_priority(stage, require_tip, 0)
}
/// Add a stage to the pipeline, specifying the unwind priority.
pub fn push_with_unwind_priority<S>(
&mut self,
stage: S,
require_tip: bool,
unwind_priority: usize,
) -> &mut Self
where
S: Stage + 'static,
{
self.stages.push(QueuedStage { stage: Box::new(stage), require_tip, unwind_priority });
self
}
/// Set the target block.
///
/// Once this block is reached, syncing will stop.
pub fn set_max_block(&mut self, block: Option<U64>) -> &mut Self {
self.max_block = block;
self
}
/// Start the pipeline by unwinding to the specified block.
pub fn start_with_unwind(&mut self, unwind_to: Option<U64>) -> &mut Self {
self.unwind_to = unwind_to;
self
}
/// Control whether the pipeline should exit after syncing.
pub fn set_exit_after_sync(&mut self, exit: bool) -> &mut Self {
self.exit_after_sync = exit;
self
}
/// Run the pipeline.
pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
todo!()
}
}