chore!:rm beacon consensus crate (#13723)

This commit is contained in:
Matthias Seitz
2025-01-08 13:13:41 +01:00
committed by GitHub
parent baf92e33fb
commit dcd4b24ae1
15 changed files with 0 additions and 5401 deletions

56
Cargo.lock generated
View File

@ -6481,62 +6481,6 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "reth-beacon-consensus"
version = "1.1.5"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-genesis",
"alloy-primitives",
"alloy-rpc-types-engine",
"assert_matches",
"futures",
"itertools 0.13.0",
"metrics",
"reth-blockchain-tree",
"reth-blockchain-tree-api",
"reth-chainspec",
"reth-codecs",
"reth-config",
"reth-consensus",
"reth-db",
"reth-db-api",
"reth-downloaders",
"reth-engine-primitives",
"reth-errors",
"reth-ethereum-consensus",
"reth-ethereum-engine-primitives",
"reth-evm",
"reth-evm-ethereum",
"reth-exex-types",
"reth-metrics",
"reth-network-p2p",
"reth-node-types",
"reth-payload-builder",
"reth-payload-builder-primitives",
"reth-payload-primitives",
"reth-payload-validator",
"reth-primitives",
"reth-primitives-traits",
"reth-provider",
"reth-prune",
"reth-prune-types",
"reth-rpc-types-compat",
"reth-stages",
"reth-stages-api",
"reth-static-file",
"reth-tasks",
"reth-testing-utils",
"reth-tokio-util",
"reth-tracing",
"schnellru",
"thiserror 2.0.9",
"tokio",
"tokio-stream",
"tracing",
]
[[package]] [[package]]
name = "reth-bench" name = "reth-bench"
version = "1.1.5" version = "1.1.5"

View File

@ -20,7 +20,6 @@ members = [
"crates/cli/runner/", "crates/cli/runner/",
"crates/cli/util/", "crates/cli/util/",
"crates/config/", "crates/config/",
"crates/consensus/beacon/",
"crates/consensus/common/", "crates/consensus/common/",
"crates/consensus/consensus/", "crates/consensus/consensus/",
"crates/consensus/debug-client/", "crates/consensus/debug-client/",
@ -304,7 +303,6 @@ overflow-checks = true
op-reth = { path = "crates/optimism/bin" } op-reth = { path = "crates/optimism/bin" }
reth = { path = "bin/reth" } reth = { path = "bin/reth" }
reth-basic-payload-builder = { path = "crates/payload/basic" } reth-basic-payload-builder = { path = "crates/payload/basic" }
reth-beacon-consensus = { path = "crates/consensus/beacon" }
reth-bench = { path = "bin/reth-bench" } reth-bench = { path = "bin/reth-bench" }
reth-blockchain-tree = { path = "crates/blockchain-tree" } reth-blockchain-tree = { path = "crates/blockchain-tree" }
reth-blockchain-tree-api = { path = "crates/blockchain-tree-api" } reth-blockchain-tree-api = { path = "crates/blockchain-tree-api" }

View File

@ -1,94 +0,0 @@
[package]
name = "reth-beacon-consensus"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
[lints]
workspace = true
[dependencies]
# reth
reth-ethereum-consensus.workspace = true
reth-blockchain-tree-api.workspace = true
reth-codecs.workspace = true
reth-db-api.workspace = true
reth-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-stages-api.workspace = true
reth-errors.workspace = true
reth-provider.workspace = true
reth-tasks.workspace = true
reth-payload-builder.workspace = true
reth-payload-builder-primitives.workspace = true
reth-payload-primitives.workspace = true
reth-payload-validator.workspace = true
reth-prune.workspace = true
reth-static-file.workspace = true
reth-tokio-util.workspace = true
reth-engine-primitives.workspace = true
reth-network-p2p.workspace = true
reth-node-types.workspace = true
reth-chainspec = { workspace = true, optional = true }
# ethereum
alloy-primitives.workspace = true
alloy-rpc-types-engine = { workspace = true, features = ["std"] }
alloy-eips.workspace = true
alloy-consensus.workspace = true
# async
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true
futures.workspace = true
# metrics
reth-metrics.workspace = true
metrics.workspace = true
# misc
tracing.workspace = true
thiserror.workspace = true
schnellru.workspace = true
itertools.workspace = true
[dev-dependencies]
# reth
reth-payload-builder = { workspace = true, features = ["test-utils"] }
reth-primitives = { workspace = true, features = ["test-utils"] }
reth-consensus = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils"] }
reth-blockchain-tree = { workspace = true, features = ["test-utils"] }
reth-db = { workspace = true, features = ["test-utils"] }
reth-db-api.workspace = true
reth-provider = { workspace = true, features = ["test-utils"] }
reth-evm = { workspace = true, features = ["test-utils"] }
reth-network-p2p = { workspace = true, features = ["test-utils"] }
reth-rpc-types-compat.workspace = true
reth-tracing.workspace = true
reth-downloaders.workspace = true
reth-evm-ethereum.workspace = true
reth-ethereum-engine-primitives.workspace = true
reth-config.workspace = true
reth-testing-utils.workspace = true
reth-exex-types.workspace = true
reth-prune-types.workspace = true
reth-chainspec.workspace = true
alloy-genesis.workspace = true
assert_matches.workspace = true
[features]
optimism = [
"reth-blockchain-tree/optimism",
"reth-codecs/op",
"reth-chainspec",
"reth-db-api/optimism",
"reth-db/optimism",
"reth-downloaders/optimism",
"reth-primitives/optimism",
"reth-provider/optimism",
"reth-downloaders/optimism",
]

View File

@ -1,44 +0,0 @@
use crate::engine::hooks::EngineHookError;
pub use reth_engine_primitives::BeaconForkChoiceUpdateError;
use reth_errors::{DatabaseError, RethError};
use reth_stages_api::PipelineError;
/// Beacon engine result.
pub type BeaconEngineResult<Ok> = Result<Ok, BeaconConsensusEngineError>;
/// The error type for the beacon consensus engine service
/// [`BeaconConsensusEngine`](crate::BeaconConsensusEngine)
///
/// Represents all possible error cases for the beacon consensus engine.
#[derive(Debug, thiserror::Error)]
pub enum BeaconConsensusEngineError {
/// Pipeline channel closed.
#[error("pipeline channel closed")]
PipelineChannelClosed,
/// Pipeline error.
#[error(transparent)]
Pipeline(#[from] Box<PipelineError>),
/// Pruner channel closed.
#[error("pruner channel closed")]
PrunerChannelClosed,
/// Hook error.
#[error(transparent)]
Hook(#[from] EngineHookError),
/// Common error. Wrapper around [`RethError`].
#[error(transparent)]
Common(#[from] RethError),
}
// box the pipeline error as it is a large enum.
impl From<PipelineError> for BeaconConsensusEngineError {
fn from(e: PipelineError) -> Self {
Self::Pipeline(Box::new(e))
}
}
// for convenience in the beacon engine
impl From<DatabaseError> for BeaconConsensusEngineError {
fn from(e: DatabaseError) -> Self {
Self::Common(e.into())
}
}

View File

@ -1,3 +0,0 @@
//! `BeaconConsensusEngine` external API
pub use reth_engine_primitives::BeaconConsensusEngineHandle;

View File

@ -1,390 +0,0 @@
use crate::hooks::{
EngineHook, EngineHookContext, EngineHookDBAccessLevel, EngineHookError, EngineHookEvent,
EngineHooks,
};
use std::{
collections::VecDeque,
task::{Context, Poll},
};
use tracing::debug;
#[derive(Debug)]
pub(crate) struct PolledHook {
pub(crate) name: &'static str,
pub(crate) event: EngineHookEvent,
pub(crate) db_access_level: EngineHookDBAccessLevel,
}
/// Manages hooks under the control of the engine.
///
/// This type polls the initialized hooks one by one, respecting the DB access level
/// (i.e. [`crate::hooks::EngineHookDBAccessLevel::ReadWrite`] that enforces running at most one
/// such hook).
pub(crate) struct EngineHooksController {
/// Collection of hooks.
///
/// Hooks might be removed from the collection, and returned upon completion.
/// In the current implementation, it only happens when moved to `active_db_write_hook`.
hooks: VecDeque<Box<dyn EngineHook>>,
/// Currently running hook with DB write access, if any.
active_db_write_hook: Option<Box<dyn EngineHook>>,
}
impl EngineHooksController {
/// Creates a new [`EngineHooksController`].
pub(crate) fn new(hooks: EngineHooks) -> Self {
Self { hooks: hooks.inner.into(), active_db_write_hook: None }
}
/// Polls currently running hook with DB write access, if any.
///
/// Returns [`Poll::Ready`] if currently running hook with DB write access returned
/// an [event][`crate::hooks::EngineHookEvent`].
///
/// Returns [`Poll::Pending`] in all other cases:
/// 1. No hook with DB write access is running.
/// 2. Currently running hook with DB write access returned [`Poll::Pending`] on polling.
/// 3. Currently running hook with DB write access returned [`Poll::Ready`] on polling, but no
/// action to act upon.
pub(crate) fn poll_active_db_write_hook(
&mut self,
cx: &mut Context<'_>,
args: EngineHookContext,
) -> Poll<Result<PolledHook, EngineHookError>> {
let Some(mut hook) = self.active_db_write_hook.take() else { return Poll::Pending };
match hook.poll(cx, args)? {
Poll::Ready(event) => {
let result = PolledHook {
name: hook.name(),
event,
db_access_level: hook.db_access_level(),
};
debug!(
target: "consensus::engine::hooks",
hook = hook.name(),
?result,
"Polled running hook with db write access"
);
if result.event.is_finished() {
self.hooks.push_back(hook);
} else {
self.active_db_write_hook = Some(hook);
}
return Poll::Ready(Ok(result))
}
Poll::Pending => {
self.active_db_write_hook = Some(hook);
}
}
Poll::Pending
}
/// Polls next engine from the collection.
///
/// Returns [`Poll::Ready`] if next hook returned an [event][`crate::hooks::EngineHookEvent`].
///
/// Returns [`Poll::Pending`] in all other cases:
/// 1. Next hook is [`Option::None`], i.e. taken, meaning it's currently running and has a DB
/// write access.
/// 2. Next hook needs a DB write access, but either there's another hook with DB write access
/// running, or `db_write_active` passed into arguments is `true`.
/// 3. Next hook returned [`Poll::Pending`] on polling.
/// 4. Next hook returned [`Poll::Ready`] on polling, but no action to act upon.
pub(crate) fn poll_next_hook(
&mut self,
cx: &mut Context<'_>,
args: EngineHookContext,
db_write_active: bool,
) -> Poll<Result<PolledHook, EngineHookError>> {
let Some(mut hook) = self.hooks.pop_front() else { return Poll::Pending };
let result = self.poll_next_hook_inner(cx, &mut hook, args, db_write_active);
if matches!(
result,
Poll::Ready(Ok(PolledHook {
event: EngineHookEvent::Started,
db_access_level: EngineHookDBAccessLevel::ReadWrite,
..
}))
) {
// If a read-write hook started, set `active_db_write_hook` to it
self.active_db_write_hook = Some(hook);
} else {
// Otherwise, push it back to the collection of hooks to poll it next time
self.hooks.push_back(hook);
}
result
}
fn poll_next_hook_inner(
&self,
cx: &mut Context<'_>,
hook: &mut Box<dyn EngineHook>,
args: EngineHookContext,
db_write_active: bool,
) -> Poll<Result<PolledHook, EngineHookError>> {
// Hook with DB write access level is not allowed to run due to any of the following
// reasons:
// - An already running hook with DB write access level
// - Active DB write according to passed argument
// - Missing a finalized block number. We might be on an optimistic sync scenario where we
// cannot skip the FCU with the finalized hash, otherwise CL might misbehave.
if hook.db_access_level().is_read_write() &&
(self.active_db_write_hook.is_some() ||
db_write_active ||
args.finalized_block_number.is_none())
{
return Poll::Pending
}
if let Poll::Ready(event) = hook.poll(cx, args)? {
let result =
PolledHook { name: hook.name(), event, db_access_level: hook.db_access_level() };
debug!(
target: "consensus::engine::hooks",
hook = hook.name(),
?result,
"Polled next hook"
);
return Poll::Ready(Ok(result))
}
debug!(target: "consensus::engine::hooks", hook = hook.name(), "Next hook is not ready");
Poll::Pending
}
/// Returns a running hook with DB write access, if there's any.
pub(crate) fn active_db_write_hook(&self) -> Option<&dyn EngineHook> {
self.active_db_write_hook.as_ref().map(|hook| hook.as_ref())
}
}
#[cfg(test)]
mod tests {
use crate::hooks::{
EngineHook, EngineHookContext, EngineHookDBAccessLevel, EngineHookEvent, EngineHooks,
EngineHooksController,
};
use futures::poll;
use reth_errors::{RethError, RethResult};
use std::{
collections::VecDeque,
future::poll_fn,
task::{Context, Poll},
};
struct TestHook {
results: VecDeque<RethResult<EngineHookEvent>>,
name: &'static str,
access_level: EngineHookDBAccessLevel,
}
impl TestHook {
fn new_ro(name: &'static str) -> Self {
Self {
results: Default::default(),
name,
access_level: EngineHookDBAccessLevel::ReadOnly,
}
}
fn new_rw(name: &'static str) -> Self {
Self {
results: Default::default(),
name,
access_level: EngineHookDBAccessLevel::ReadWrite,
}
}
fn add_result(&mut self, result: RethResult<EngineHookEvent>) {
self.results.push_back(result);
}
}
impl EngineHook for TestHook {
fn name(&self) -> &'static str {
self.name
}
fn poll(
&mut self,
_cx: &mut Context<'_>,
_ctx: EngineHookContext,
) -> Poll<RethResult<EngineHookEvent>> {
self.results.pop_front().map_or(Poll::Pending, Poll::Ready)
}
fn db_access_level(&self) -> EngineHookDBAccessLevel {
self.access_level
}
}
#[tokio::test]
async fn poll_active_db_write_hook() {
let mut controller = EngineHooksController::new(EngineHooks::new());
let context = EngineHookContext { tip_block_number: 2, finalized_block_number: Some(1) };
// No currently running hook with DB write access is set
let result = poll!(poll_fn(|cx| controller.poll_active_db_write_hook(cx, context)));
assert!(result.is_pending());
// Currently running hook with DB write access returned `Pending` on polling
controller.active_db_write_hook = Some(Box::new(TestHook::new_rw("read-write")));
let result = poll!(poll_fn(|cx| controller.poll_active_db_write_hook(cx, context)));
assert!(result.is_pending());
// Currently running hook with DB write access returned `Ready` on polling, but didn't
// return `EngineHookEvent::Finished` yet.
// Currently running hooks with DB write should still be set.
let mut hook = TestHook::new_rw("read-write");
hook.add_result(Ok(EngineHookEvent::Started));
controller.active_db_write_hook = Some(Box::new(hook));
let result = poll!(poll_fn(|cx| controller.poll_active_db_write_hook(cx, context)));
assert_eq!(
result.map(|result| {
let polled_hook = result.unwrap();
polled_hook.event.is_started() && polled_hook.db_access_level.is_read_write()
}),
Poll::Ready(true)
);
assert!(controller.active_db_write_hook.is_some());
assert!(controller.hooks.is_empty());
// Currently running hook with DB write access returned `Ready` on polling and
// `EngineHookEvent::Finished` inside.
// Currently running hooks with DB write should be moved to collection of hooks.
let mut hook = TestHook::new_rw("read-write");
hook.add_result(Ok(EngineHookEvent::Finished(Ok(()))));
controller.active_db_write_hook = Some(Box::new(hook));
let result = poll!(poll_fn(|cx| controller.poll_active_db_write_hook(cx, context)));
assert_eq!(
result.map(|result| {
let polled_hook = result.unwrap();
polled_hook.event.is_finished() && polled_hook.db_access_level.is_read_write()
}),
Poll::Ready(true)
);
assert!(controller.active_db_write_hook.is_none());
assert!(controller.hooks.pop_front().is_some());
}
#[tokio::test]
async fn poll_next_hook_db_write_active() {
let context = EngineHookContext { tip_block_number: 2, finalized_block_number: Some(1) };
let mut hook_rw = TestHook::new_rw("read-write");
hook_rw.add_result(Ok(EngineHookEvent::Started));
let hook_ro_name = "read-only";
let mut hook_ro = TestHook::new_ro(hook_ro_name);
hook_ro.add_result(Ok(EngineHookEvent::Started));
let mut hooks = EngineHooks::new();
hooks.add(hook_rw);
hooks.add(hook_ro);
let mut controller = EngineHooksController::new(hooks);
// Read-write hook can't be polled when external DB write is active
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, true)));
assert!(result.is_pending());
assert!(controller.active_db_write_hook.is_none());
// Read-only hook can be polled when external DB write is active
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, true)));
assert_eq!(
result.map(|result| {
let polled_hook = result.unwrap();
polled_hook.name == hook_ro_name &&
polled_hook.event.is_started() &&
polled_hook.db_access_level.is_read_only()
}),
Poll::Ready(true)
);
}
#[tokio::test]
async fn poll_next_hook_db_write_inactive() {
let context = EngineHookContext { tip_block_number: 2, finalized_block_number: Some(1) };
let hook_rw_1_name = "read-write-1";
let mut hook_rw_1 = TestHook::new_rw(hook_rw_1_name);
hook_rw_1.add_result(Ok(EngineHookEvent::Started));
let hook_rw_2_name = "read-write-2";
let mut hook_rw_2 = TestHook::new_rw(hook_rw_2_name);
hook_rw_2.add_result(Ok(EngineHookEvent::Started));
let hook_ro_name = "read-only";
let mut hook_ro = TestHook::new_ro(hook_ro_name);
hook_ro.add_result(Ok(EngineHookEvent::Started));
hook_ro.add_result(Err(RethError::msg("something went wrong")));
let mut hooks = EngineHooks::new();
hooks.add(hook_rw_1);
hooks.add(hook_rw_2);
hooks.add(hook_ro);
let mut controller = EngineHooksController::new(hooks);
let hooks_len = controller.hooks.len();
// Read-write hook can be polled because external DB write is not active
assert_eq!(controller.hooks.front().map(|hook| hook.name()), Some(hook_rw_1_name));
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, false)));
assert_eq!(
result.map(|result| {
let polled_hook = result.unwrap();
polled_hook.name == hook_rw_1_name &&
polled_hook.event.is_started() &&
polled_hook.db_access_level.is_read_write()
}),
Poll::Ready(true)
);
assert_eq!(
controller.active_db_write_hook.as_ref().map(|hook| hook.name()),
Some(hook_rw_1_name)
);
// Read-write hook cannot be polled because another read-write hook is running
assert_eq!(controller.hooks.front().map(|hook| hook.name()), Some(hook_rw_2_name));
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, false)));
assert!(result.is_pending());
// Read-only hook can be polled in parallel with already running read-write hook
assert_eq!(controller.hooks.front().map(|hook| hook.name()), Some(hook_ro_name));
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, false)));
assert_eq!(
result.map(|result| {
let polled_hook = result.unwrap();
polled_hook.name == hook_ro_name &&
polled_hook.event.is_started() &&
polled_hook.db_access_level.is_read_only()
}),
Poll::Ready(true)
);
// Read-write hook still cannot be polled because another read-write hook is running
assert_eq!(controller.hooks.front().map(|hook| hook.name()), Some(hook_rw_2_name));
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, false)));
assert!(result.is_pending());
// Read-only hook has finished with error
assert_eq!(controller.hooks.front().map(|hook| hook.name()), Some(hook_ro_name));
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, false)));
assert_eq!(result.map(|result| { result.is_err() }), Poll::Ready(true));
assert!(controller.active_db_write_hook.is_some());
assert_eq!(controller.hooks.len(), hooks_len - 1)
}
}

View File

@ -1,129 +0,0 @@
use alloy_primitives::BlockNumber;
use reth_errors::{RethError, RethResult};
use std::{
fmt,
task::{Context, Poll},
};
mod controller;
pub(crate) use controller::{EngineHooksController, PolledHook};
mod prune;
pub use prune::PruneHook;
mod static_file;
pub use static_file::StaticFileHook;
/// Collection of [engine hooks][`EngineHook`].
#[derive(Default)]
pub struct EngineHooks {
inner: Vec<Box<dyn EngineHook>>,
}
impl fmt::Debug for EngineHooks {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EngineHooks").field("inner", &self.inner.len()).finish()
}
}
impl EngineHooks {
/// Creates a new empty collection of [engine hooks][`EngineHook`].
pub fn new() -> Self {
Self { inner: Vec::new() }
}
/// Adds a new [engine hook][`EngineHook`] to the collection.
pub fn add<H: EngineHook>(&mut self, hook: H) {
self.inner.push(Box::new(hook))
}
}
/// Hook that will be run during the main loop of
/// [consensus engine][`crate::engine::BeaconConsensusEngine`].
pub trait EngineHook: Send + Sync + 'static {
/// Returns a human-readable name for the hook.
fn name(&self) -> &'static str;
/// Advances the hook execution, emitting an [event][`EngineHookEvent`].
fn poll(
&mut self,
cx: &mut Context<'_>,
ctx: EngineHookContext,
) -> Poll<RethResult<EngineHookEvent>>;
/// Returns [db access level][`EngineHookDBAccessLevel`] the hook needs.
fn db_access_level(&self) -> EngineHookDBAccessLevel;
}
/// Engine context passed to the [hook polling function][`EngineHook::poll`].
#[derive(Copy, Clone, Debug)]
pub struct EngineHookContext {
/// Tip block number.
pub tip_block_number: BlockNumber,
/// Finalized block number, if known.
pub finalized_block_number: Option<BlockNumber>,
}
/// An event emitted when [hook][`EngineHook`] is polled.
#[derive(Debug)]
pub enum EngineHookEvent {
/// Hook is not ready.
///
/// If this is returned, the hook is idle.
NotReady,
/// Hook started.
///
/// If this is returned, the hook is running.
Started,
/// Hook finished.
///
/// If this is returned, the hook is idle.
Finished(Result<(), EngineHookError>),
}
impl EngineHookEvent {
/// Returns `true` if the event is [`EngineHookEvent::Started`].
pub const fn is_started(&self) -> bool {
matches!(self, Self::Started)
}
/// Returns `true` if the event is [`EngineHookEvent::Finished`].
pub const fn is_finished(&self) -> bool {
matches!(self, Self::Finished(_))
}
}
/// An error returned by [hook][`EngineHook`].
#[derive(Debug, thiserror::Error)]
pub enum EngineHookError {
/// Hook channel closed.
#[error("hook channel closed")]
ChannelClosed,
/// Common error. Wrapper around [`RethError`].
#[error(transparent)]
Common(#[from] RethError),
/// An internal error occurred.
#[error(transparent)]
Internal(#[from] Box<dyn core::error::Error + Send + Sync>),
}
/// Level of database access the hook needs for execution.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum EngineHookDBAccessLevel {
/// Read-only database access.
ReadOnly,
/// Read-write database access.
ReadWrite,
}
impl EngineHookDBAccessLevel {
/// Returns `true` if the hook needs read-only access to the database.
pub const fn is_read_only(&self) -> bool {
matches!(self, Self::ReadOnly)
}
/// Returns `true` if the hook needs read-write access to the database.
pub const fn is_read_write(&self) -> bool {
matches!(self, Self::ReadWrite)
}
}

View File

@ -1,203 +0,0 @@
//! Prune hook for the engine implementation.
use crate::{
engine::hooks::{EngineHook, EngineHookContext, EngineHookError, EngineHookEvent},
hooks::EngineHookDBAccessLevel,
};
use alloy_primitives::BlockNumber;
use futures::FutureExt;
use metrics::Counter;
use reth_errors::{RethError, RethResult};
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter};
use reth_prune::{Pruner, PrunerError, PrunerWithResult};
use reth_tasks::TaskSpawner;
use std::{
fmt::{self, Debug},
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
/// Manages pruning under the control of the engine.
///
/// This type controls the [Pruner].
pub struct PruneHook<PF: DatabaseProviderFactory> {
/// The current state of the pruner.
pruner_state: PrunerState<PF>,
/// The type that can spawn the pruner task.
pruner_task_spawner: Box<dyn TaskSpawner>,
metrics: Metrics,
}
impl<PF> fmt::Debug for PruneHook<PF>
where
PF: DatabaseProviderFactory<ProviderRW: fmt::Debug> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PruneHook")
.field("pruner_state", &self.pruner_state)
.field("metrics", &self.metrics)
.finish()
}
}
impl<PF: DatabaseProviderFactory> PruneHook<PF> {
/// Create a new instance
pub fn new(
pruner: Pruner<PF::ProviderRW, PF>,
pruner_task_spawner: Box<dyn TaskSpawner>,
) -> Self {
Self {
pruner_state: PrunerState::Idle(Some(pruner)),
pruner_task_spawner,
metrics: Metrics::default(),
}
}
/// Advances the pruner state.
///
/// This checks for the result in the channel, or returns pending if the pruner is idle.
fn poll_pruner(&mut self, cx: &mut Context<'_>) -> Poll<RethResult<EngineHookEvent>> {
let result = match self.pruner_state {
PrunerState::Idle(_) => return Poll::Pending,
PrunerState::Running(ref mut fut) => {
ready!(fut.poll_unpin(cx))
}
};
let event = match result {
Ok((pruner, result)) => {
self.pruner_state = PrunerState::Idle(Some(pruner));
match result {
Ok(_) => EngineHookEvent::Finished(Ok(())),
Err(err) => EngineHookEvent::Finished(Err(err.into())),
}
}
Err(_) => {
// failed to receive the pruner
EngineHookEvent::Finished(Err(EngineHookError::ChannelClosed))
}
};
Poll::Ready(Ok(event))
}
}
impl<PF> PruneHook<PF>
where
PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointReader + PruneCheckpointWriter>
+ 'static,
{
/// This will try to spawn the pruner if it is idle:
/// 1. Check if pruning is needed through [`Pruner::is_pruning_needed`].
///
/// 2.1. If pruning is needed, pass tip block number to the [`Pruner::run`] and spawn it in a
/// separate task. Set pruner state to [`PrunerState::Running`].
/// 2.2. If pruning is not needed, set pruner state back to [`PrunerState::Idle`].
///
/// If pruner is already running, do nothing.
fn try_spawn_pruner(&mut self, tip_block_number: BlockNumber) -> Option<EngineHookEvent> {
match &mut self.pruner_state {
PrunerState::Idle(pruner) => {
let mut pruner = pruner.take()?;
// Check tip for pruning
if pruner.is_pruning_needed(tip_block_number) {
let (tx, rx) = oneshot::channel();
self.pruner_task_spawner.spawn_critical_blocking(
"pruner task",
Box::pin(async move {
let result = pruner.run(tip_block_number);
let _ = tx.send((pruner, result));
}),
);
self.metrics.runs_total.increment(1);
self.pruner_state = PrunerState::Running(rx);
Some(EngineHookEvent::Started)
} else {
self.pruner_state = PrunerState::Idle(Some(pruner));
Some(EngineHookEvent::NotReady)
}
}
PrunerState::Running(_) => None,
}
}
}
impl<PF> EngineHook for PruneHook<PF>
where
PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointReader + PruneCheckpointWriter>
+ 'static,
{
fn name(&self) -> &'static str {
"Prune"
}
fn poll(
&mut self,
cx: &mut Context<'_>,
ctx: EngineHookContext,
) -> Poll<RethResult<EngineHookEvent>> {
// Try to spawn a pruner
match self.try_spawn_pruner(ctx.tip_block_number) {
Some(EngineHookEvent::NotReady) => return Poll::Pending,
Some(event) => return Poll::Ready(Ok(event)),
None => (),
}
// Poll pruner and check its status
self.poll_pruner(cx)
}
fn db_access_level(&self) -> EngineHookDBAccessLevel {
EngineHookDBAccessLevel::ReadWrite
}
}
/// The possible pruner states within the sync controller.
///
/// [`PrunerState::Idle`] means that the pruner is currently idle.
/// [`PrunerState::Running`] means that the pruner is currently running.
///
/// NOTE: The differentiation between these two states is important, because when the pruner is
/// running, it acquires the write lock over the database. This means that we cannot forward to the
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
enum PrunerState<PF: DatabaseProviderFactory> {
/// Pruner is idle.
Idle(Option<Pruner<PF::ProviderRW, PF>>),
/// Pruner is running and waiting for a response
Running(oneshot::Receiver<PrunerWithResult<PF::ProviderRW, PF>>),
}
impl<PF> fmt::Debug for PrunerState<PF>
where
PF: DatabaseProviderFactory<ProviderRW: Debug> + Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Idle(f0) => f.debug_tuple("Idle").field(&f0).finish(),
Self::Running(f0) => f.debug_tuple("Running").field(&f0).finish(),
}
}
}
#[derive(reth_metrics::Metrics)]
#[metrics(scope = "consensus.engine.prune")]
struct Metrics {
/// The number of times the pruner was run.
runs_total: Counter,
}
impl From<PrunerError> for EngineHookError {
fn from(err: PrunerError) -> Self {
match err {
PrunerError::PruneSegment(_) | PrunerError::InconsistentData(_) => {
Self::Internal(Box::new(err))
}
PrunerError::Database(err) => RethError::Database(err).into(),
PrunerError::Provider(err) => RethError::Provider(err).into(),
}
}
}

View File

@ -1,209 +0,0 @@
//! `StaticFile` hook for the engine implementation.
use crate::{
engine::hooks::{EngineHook, EngineHookContext, EngineHookError, EngineHookEvent},
hooks::EngineHookDBAccessLevel,
};
use alloy_primitives::BlockNumber;
use futures::FutureExt;
use reth_codecs::Compact;
use reth_db_api::table::Value;
use reth_errors::RethResult;
use reth_primitives::{static_file::HighestStaticFiles, NodePrimitives};
use reth_provider::{
BlockReader, ChainStateBlockReader, DatabaseProviderFactory, StageCheckpointReader,
StaticFileProviderFactory,
};
use reth_static_file::{StaticFileProducer, StaticFileProducerWithResult};
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
use tokio::sync::oneshot;
use tracing::trace;
/// Manages producing static files under the control of the engine.
///
/// This type controls the [`StaticFileProducer`].
#[derive(Debug)]
pub struct StaticFileHook<Provider> {
/// The current state of the `static_file_producer`.
state: StaticFileProducerState<Provider>,
/// The type that can spawn the `static_file_producer` task.
task_spawner: Box<dyn TaskSpawner>,
}
impl<Provider> StaticFileHook<Provider>
where
Provider: StaticFileProviderFactory
+ DatabaseProviderFactory<
Provider: StaticFileProviderFactory<
Primitives: NodePrimitives<
SignedTx: Value + Compact,
BlockHeader: Value + Compact,
Receipt: Value + Compact,
>,
> + StageCheckpointReader
+ BlockReader
+ ChainStateBlockReader,
> + 'static,
{
/// Create a new instance
pub fn new(
static_file_producer: StaticFileProducer<Provider>,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
Self { state: StaticFileProducerState::Idle(Some(static_file_producer)), task_spawner }
}
/// Advances the `static_file_producer` state.
///
/// This checks for the result in the channel, or returns pending if the `static_file_producer`
/// is idle.
fn poll_static_file_producer(
&mut self,
cx: &mut Context<'_>,
) -> Poll<RethResult<EngineHookEvent>> {
let result = match self.state {
StaticFileProducerState::Idle(_) => return Poll::Pending,
StaticFileProducerState::Running(ref mut fut) => {
ready!(fut.poll_unpin(cx))
}
};
let event = match result {
Ok((static_file_producer, result)) => {
self.state = StaticFileProducerState::Idle(Some(static_file_producer));
match result {
Ok(_) => EngineHookEvent::Finished(Ok(())),
Err(err) => EngineHookEvent::Finished(Err(EngineHookError::Common(err.into()))),
}
}
Err(_) => {
// failed to receive the static_file_producer
EngineHookEvent::Finished(Err(EngineHookError::ChannelClosed))
}
};
Poll::Ready(Ok(event))
}
/// This will try to spawn the `static_file_producer` if it is idle:
/// 1. Check if producing static files is needed through
/// [`StaticFileProducer::get_static_file_targets`](reth_static_file::StaticFileProducerInner::get_static_file_targets)
/// and then [`StaticFileTargets::any`](reth_static_file::StaticFileTargets::any).
///
/// 2.1. If producing static files is needed, pass static file request to the
/// [`StaticFileProducer::run`](reth_static_file::StaticFileProducerInner::run) and
/// spawn it in a separate task. Set static file producer state to
/// [`StaticFileProducerState::Running`].
/// 2.2. If producing static files is not needed, set static file producer state back to
/// [`StaticFileProducerState::Idle`].
///
/// If `static_file_producer` is already running, do nothing.
fn try_spawn_static_file_producer(
&mut self,
finalized_block_number: BlockNumber,
) -> RethResult<Option<EngineHookEvent>> {
Ok(match &mut self.state {
StaticFileProducerState::Idle(static_file_producer) => {
let Some(static_file_producer) = static_file_producer.take() else {
trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer is already running but the state is idle");
return Ok(None)
};
let Some(locked_static_file_producer) = static_file_producer.try_lock_arc() else {
trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer lock is already taken");
return Ok(None)
};
let finalized_block_number = locked_static_file_producer
.last_finalized_block()?
.map(|on_disk| finalized_block_number.min(on_disk))
.unwrap_or(finalized_block_number);
let targets =
locked_static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: Some(finalized_block_number),
receipts: Some(finalized_block_number),
transactions: Some(finalized_block_number),
})?;
// Check if the moving data to static files has been requested.
if targets.any() {
let (tx, rx) = oneshot::channel();
self.task_spawner.spawn_critical_blocking(
"static_file_producer task",
Box::pin(async move {
let result = locked_static_file_producer.run(targets);
let _ = tx.send((static_file_producer, result));
}),
);
self.state = StaticFileProducerState::Running(rx);
Some(EngineHookEvent::Started)
} else {
self.state = StaticFileProducerState::Idle(Some(static_file_producer));
Some(EngineHookEvent::NotReady)
}
}
StaticFileProducerState::Running(_) => None,
})
}
}
impl<Provider> EngineHook for StaticFileHook<Provider>
where
Provider: StaticFileProviderFactory
+ DatabaseProviderFactory<
Provider: StaticFileProviderFactory<
Primitives: NodePrimitives<
SignedTx: Value + Compact,
BlockHeader: Value + Compact,
Receipt: Value + Compact,
>,
> + StageCheckpointReader
+ BlockReader
+ ChainStateBlockReader,
> + 'static,
{
fn name(&self) -> &'static str {
"StaticFile"
}
fn poll(
&mut self,
cx: &mut Context<'_>,
ctx: EngineHookContext,
) -> Poll<RethResult<EngineHookEvent>> {
let Some(finalized_block_number) = ctx.finalized_block_number else {
trace!(target: "consensus::engine::hooks::static_file", ?ctx, "Finalized block number is not available");
return Poll::Pending
};
// Try to spawn a static_file_producer
match self.try_spawn_static_file_producer(finalized_block_number)? {
Some(EngineHookEvent::NotReady) => return Poll::Pending,
Some(event) => return Poll::Ready(Ok(event)),
None => (),
}
// Poll static_file_producer and check its status
self.poll_static_file_producer(cx)
}
fn db_access_level(&self) -> EngineHookDBAccessLevel {
EngineHookDBAccessLevel::ReadOnly
}
}
/// The possible `static_file_producer` states within the sync controller.
///
/// [`StaticFileProducerState::Idle`] means that the static file producer is currently idle.
/// [`StaticFileProducerState::Running`] means that the static file producer is currently running.
#[derive(Debug)]
enum StaticFileProducerState<Provider> {
/// [`StaticFileProducer`] is idle.
Idle(Option<StaticFileProducer<Provider>>),
/// [`StaticFileProducer`] is running and waiting for a response
Running(oneshot::Receiver<StaticFileProducerWithResult<Provider>>),
}

View File

@ -1,125 +0,0 @@
use alloy_eips::eip1898::BlockWithParent;
use alloy_primitives::B256;
use reth_metrics::{
metrics::{Counter, Gauge},
Metrics,
};
use schnellru::{ByLength, LruMap};
use std::fmt::Debug;
use tracing::warn;
/// The max hit counter for invalid headers in the cache before it is forcefully evicted.
///
/// In other words, if a header is referenced more than this number of times, it will be evicted to
/// allow for reprocessing.
const INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128;
/// Keeps track of invalid headers.
#[derive(Debug)]
pub struct InvalidHeaderCache {
/// This maps a header hash to a reference to its invalid ancestor.
headers: LruMap<B256, HeaderEntry>,
/// Metrics for the cache.
metrics: InvalidHeaderCacheMetrics,
}
impl InvalidHeaderCache {
/// Invalid header cache constructor.
pub fn new(max_length: u32) -> Self {
Self { headers: LruMap::new(ByLength::new(max_length)), metrics: Default::default() }
}
fn insert_entry(&mut self, hash: B256, header: BlockWithParent) {
self.headers.insert(hash, HeaderEntry { header, hit_count: 0 });
}
/// Returns the invalid ancestor's header if it exists in the cache.
///
/// If this is called, the hit count for the entry is incremented.
/// If the hit count exceeds the threshold, the entry is evicted and `None` is returned.
pub fn get(&mut self, hash: &B256) -> Option<BlockWithParent> {
{
let entry = self.headers.get(hash)?;
entry.hit_count += 1;
if entry.hit_count < INVALID_HEADER_HIT_EVICTION_THRESHOLD {
return Some(entry.header)
}
}
// if we get here, the entry has been hit too many times, so we evict it
self.headers.remove(hash);
self.metrics.hit_evictions.increment(1);
None
}
/// Inserts an invalid block into the cache, with a given invalid ancestor.
pub fn insert_with_invalid_ancestor(
&mut self,
header_hash: B256,
invalid_ancestor: BlockWithParent,
) {
if self.get(&header_hash).is_none() {
warn!(target: "consensus::engine", hash=?header_hash, ?invalid_ancestor, "Bad block with existing invalid ancestor");
self.insert_entry(header_hash, invalid_ancestor);
// update metrics
self.metrics.known_ancestor_inserts.increment(1);
self.metrics.count.set(self.headers.len() as f64);
}
}
/// Inserts an invalid ancestor into the map.
pub fn insert(&mut self, invalid_ancestor: BlockWithParent) {
if self.get(&invalid_ancestor.block.hash).is_none() {
warn!(target: "consensus::engine", ?invalid_ancestor, "Bad block with hash");
self.insert_entry(invalid_ancestor.block.hash, invalid_ancestor);
// update metrics
self.metrics.unique_inserts.increment(1);
self.metrics.count.set(self.headers.len() as f64);
}
}
}
struct HeaderEntry {
/// Keeps track how many times this header has been hit.
hit_count: u8,
/// The actual header entry
header: BlockWithParent,
}
/// Metrics for the invalid headers cache.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon.invalid_headers")]
struct InvalidHeaderCacheMetrics {
/// The total number of invalid headers in the cache.
count: Gauge,
/// The number of inserts with a known ancestor.
known_ancestor_inserts: Counter,
/// The number of unique invalid header inserts (i.e. without a known ancestor).
unique_inserts: Counter,
/// The number of times a header was evicted from the cache because it was hit too many times.
hit_evictions: Counter,
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::Header;
use reth_primitives::SealedHeader;
#[test]
fn test_hit_eviction() {
let mut cache = InvalidHeaderCache::new(10);
let header = Header::default();
let header = SealedHeader::seal(header);
cache.insert(header.block_with_parent());
assert_eq!(cache.headers.get(&header.hash()).unwrap().hit_count, 0);
for hit in 1..INVALID_HEADER_HIT_EVICTION_THRESHOLD {
assert!(cache.get(&header.hash()).is_some());
assert_eq!(cache.headers.get(&header.hash()).unwrap().hit_count, hit);
}
assert!(cache.get(&header.hash()).is_none());
}
}

View File

@ -1,32 +0,0 @@
use reth_metrics::{
metrics::{Counter, Gauge, Histogram},
Metrics,
};
/// Beacon consensus engine metrics.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
pub(crate) struct EngineMetrics {
/// The number of times the pipeline was run.
pub(crate) pipeline_runs: Counter,
/// The total count of forkchoice updated messages received.
pub(crate) forkchoice_updated_messages: Counter,
/// The total count of new payload messages received.
pub(crate) new_payload_messages: Counter,
/// Latency for making canonical already canonical block
pub(crate) make_canonical_already_canonical_latency: Histogram,
/// Latency for making canonical committed block
pub(crate) make_canonical_committed_latency: Histogram,
/// Latency for making canonical returns error
pub(crate) make_canonical_error_latency: Histogram,
/// Latency for all making canonical results
pub(crate) make_canonical_latency: Histogram,
}
/// Metrics for the `EngineSyncController`.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
pub(crate) struct EngineSyncMetrics {
/// How many blocks are currently being downloaded.
pub(crate) active_block_downloads: Gauge,
}

File diff suppressed because it is too large Load Diff

View File

@ -1,672 +0,0 @@
//! Sync management for the engine implementation.
use crate::{
engine::metrics::EngineSyncMetrics, BeaconConsensusEngineEvent,
ConsensusEngineLiveSyncProgress, EthBeaconConsensus,
};
use alloy_consensus::Header;
use alloy_primitives::{BlockNumber, B256};
use futures::FutureExt;
use reth_network_p2p::{
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
BlockClient,
};
use reth_node_types::{BodyTy, HeaderTy};
use reth_primitives::{BlockBody, EthPrimitives, NodePrimitives, SealedBlock};
use reth_provider::providers::ProviderNodeTypes;
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventSender;
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap},
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tracing::trace;
/// Manages syncing under the control of the engine.
///
/// This type controls the [Pipeline] and supports (single) full block downloads.
///
/// Caution: If the pipeline is running, this type will not emit blocks downloaded from the network
/// [`EngineSyncEvent::FetchedFullBlock`] until the pipeline is idle to prevent commits to the
/// database while the pipeline is still active.
pub(crate) struct EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient,
{
/// A downloader that can download full blocks from the network.
full_block_client: FullBlockClient<Client>,
/// The type that can spawn the pipeline task.
pipeline_task_spawner: Box<dyn TaskSpawner>,
/// The current state of the pipeline.
/// The pipeline is used for large ranges.
pipeline_state: PipelineState<N>,
/// Pending target block for the pipeline to sync
pending_pipeline_target: Option<PipelineTarget>,
/// In-flight full block requests in progress.
inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
/// In-flight full block _range_ requests in progress.
inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
/// Sender for engine events.
event_sender: EventSender<BeaconConsensusEngineEvent<N::Primitives>>,
/// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
/// ordering. This means the blocks will be popped from the heap with ascending block numbers.
range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock<HeaderTy<N>, BodyTy<N>>>>,
/// Max block after which the consensus engine would terminate the sync. Used for debugging
/// purposes.
max_block: Option<BlockNumber>,
/// Engine sync metrics.
metrics: EngineSyncMetrics,
}
impl<N, Client> EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient,
{
/// Create a new instance
pub(crate) fn new(
pipeline: Pipeline<N>,
client: Client,
pipeline_task_spawner: Box<dyn TaskSpawner>,
max_block: Option<BlockNumber>,
chain_spec: Arc<N::ChainSpec>,
event_sender: EventSender<BeaconConsensusEngineEvent<N::Primitives>>,
) -> Self {
Self {
full_block_client: FullBlockClient::new(
client,
Arc::new(EthBeaconConsensus::new(chain_spec)),
),
pipeline_task_spawner,
pipeline_state: PipelineState::Idle(Some(pipeline)),
pending_pipeline_target: None,
inflight_full_block_requests: Vec::new(),
inflight_block_range_requests: Vec::new(),
range_buffered_blocks: BinaryHeap::new(),
event_sender,
max_block,
metrics: EngineSyncMetrics::default(),
}
}
}
impl<N, Client> EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
{
/// Sets the metrics for the active downloads
fn update_block_download_metrics(&self) {
self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
// TODO: full block range metrics
}
/// Sets the max block value for testing
#[cfg(test)]
pub(crate) fn set_max_block(&mut self, block: BlockNumber) {
self.max_block = Some(block);
}
/// Cancels all download requests that are in progress and buffered blocks.
pub(crate) fn clear_block_download_requests(&mut self) {
self.inflight_full_block_requests.clear();
self.inflight_block_range_requests.clear();
self.range_buffered_blocks.clear();
self.update_block_download_metrics();
}
/// Cancels the full block request with the given hash.
pub(crate) fn cancel_full_block_request(&mut self, hash: B256) {
self.inflight_full_block_requests.retain(|req| *req.hash() != hash);
self.update_block_download_metrics();
}
/// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
#[allow(dead_code)]
pub(crate) const fn is_pipeline_sync_pending(&self) -> bool {
self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
}
/// Returns `true` if the pipeline is idle.
pub(crate) const fn is_pipeline_idle(&self) -> bool {
self.pipeline_state.is_idle()
}
/// Returns `true` if the pipeline is active.
pub(crate) const fn is_pipeline_active(&self) -> bool {
!self.is_pipeline_idle()
}
/// Returns true if there's already a request for the given hash.
pub(crate) fn is_inflight_request(&self, hash: B256) -> bool {
self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)
}
/// Starts requesting a range of blocks from the network, in reverse from the given hash.
///
/// If the `count` is 1, this will use the `download_full_block` method instead, because it
/// downloads headers and bodies for the block concurrently.
pub(crate) fn download_block_range(&mut self, hash: B256, count: u64) {
if count == 1 {
self.download_full_block(hash);
} else {
trace!(
target: "consensus::engine",
?hash,
?count,
"start downloading full block range."
);
// notify listeners that we're downloading a block range
self.event_sender.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: count,
target: hash,
},
));
let request = self.full_block_client.get_full_block_range(hash, count);
self.inflight_block_range_requests.push(request);
}
// // TODO: need more metrics for block ranges
// self.update_block_download_metrics();
}
/// Starts requesting a full block from the network.
///
/// Returns `true` if the request was started, `false` if there's already a request for the
/// given hash.
pub(crate) fn download_full_block(&mut self, hash: B256) -> bool {
if self.is_inflight_request(hash) {
return false
}
trace!(
target: "consensus::engine::sync",
?hash,
"Start downloading full block"
);
// notify listeners that we're downloading a block
self.event_sender.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: 1,
target: hash,
},
));
let request = self.full_block_client.get_full_block(hash);
self.inflight_full_block_requests.push(request);
self.update_block_download_metrics();
true
}
/// Sets a new target to sync the pipeline to.
///
/// But ensures the target is not the zero hash.
pub(crate) fn set_pipeline_sync_target(&mut self, target: PipelineTarget) {
if target.sync_target().is_some_and(|target| target.is_zero()) {
trace!(
target: "consensus::engine::sync",
"Pipeline target cannot be zero hash."
);
// precaution to never sync to the zero hash
return
}
self.pending_pipeline_target = Some(target);
}
/// Check if the engine reached max block as specified by `max_block` parameter.
///
/// Note: this is mainly for debugging purposes.
pub(crate) fn has_reached_max_block(&self, progress: BlockNumber) -> bool {
let has_reached_max_block = self.max_block.is_some_and(|target| progress >= target);
if has_reached_max_block {
trace!(
target: "consensus::engine::sync",
?progress,
max_block = ?self.max_block,
"Consensus engine reached max block"
);
}
has_reached_max_block
}
/// Advances the pipeline state.
///
/// This checks for the result in the channel, or returns pending if the pipeline is idle.
fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent<N::Primitives>> {
let res = match self.pipeline_state {
PipelineState::Idle(_) => return Poll::Pending,
PipelineState::Running(ref mut fut) => {
ready!(fut.poll_unpin(cx))
}
};
let ev = match res {
Ok((pipeline, result)) => {
let minimum_block_number = pipeline.minimum_block_number();
let reached_max_block =
self.has_reached_max_block(minimum_block_number.unwrap_or_default());
self.pipeline_state = PipelineState::Idle(Some(pipeline));
EngineSyncEvent::PipelineFinished { result, reached_max_block }
}
Err(_) => {
// failed to receive the pipeline
EngineSyncEvent::PipelineTaskDropped
}
};
Poll::Ready(ev)
}
/// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to
/// run continuously.
fn try_spawn_pipeline(&mut self) -> Option<EngineSyncEvent<N::Primitives>> {
match &mut self.pipeline_state {
PipelineState::Idle(pipeline) => {
let target = self.pending_pipeline_target.take()?;
let (tx, rx) = oneshot::channel();
let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking(
"pipeline task",
Box::pin(async move {
let result = pipeline.run_as_fut(Some(target)).await;
let _ = tx.send(result);
}),
);
self.pipeline_state = PipelineState::Running(rx);
// we also clear any pending full block requests because we expect them to be
// outdated (included in the range the pipeline is syncing anyway)
self.clear_block_download_requests();
Some(EngineSyncEvent::PipelineStarted(Some(target)))
}
PipelineState::Running(_) => None,
}
}
/// Advances the sync process.
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent<N::Primitives>> {
// try to spawn a pipeline if a target is set
if let Some(event) = self.try_spawn_pipeline() {
return Poll::Ready(event)
}
// make sure we poll the pipeline if it's active, and return any ready pipeline events
if !self.is_pipeline_idle() {
// advance the pipeline
if let Poll::Ready(event) = self.poll_pipeline(cx) {
return Poll::Ready(event)
}
}
// advance all full block requests
for idx in (0..self.inflight_full_block_requests.len()).rev() {
let mut request = self.inflight_full_block_requests.swap_remove(idx);
if let Poll::Ready(block) = request.poll_unpin(cx) {
trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering");
self.range_buffered_blocks.push(Reverse(OrderedSealedBlock(block)));
} else {
// still pending
self.inflight_full_block_requests.push(request);
}
}
// advance all full block range requests
for idx in (0..self.inflight_block_range_requests.len()).rev() {
let mut request = self.inflight_block_range_requests.swap_remove(idx);
if let Poll::Ready(blocks) = request.poll_unpin(cx) {
trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering");
self.range_buffered_blocks
.extend(blocks.into_iter().map(OrderedSealedBlock).map(Reverse));
} else {
// still pending
self.inflight_block_range_requests.push(request);
}
}
self.update_block_download_metrics();
// drain an element of the block buffer if there are any
if let Some(block) = self.range_buffered_blocks.pop() {
// peek ahead and pop duplicates
while let Some(peek) = self.range_buffered_blocks.peek_mut() {
if peek.0 .0.hash() == block.0 .0.hash() {
PeekMut::pop(peek);
} else {
break
}
}
return Poll::Ready(EngineSyncEvent::FetchedFullBlock(block.0 .0))
}
Poll::Pending
}
}
/// A wrapper type around [`SealedBlock`] that implements the [Ord] trait by block number.
#[derive(Debug, Clone, PartialEq, Eq)]
struct OrderedSealedBlock<H = Header, B = BlockBody>(SealedBlock<H, B>);
impl<H, B> PartialOrd for OrderedSealedBlock<H, B>
where
H: reth_primitives_traits::BlockHeader + 'static,
B: reth_primitives_traits::BlockBody + 'static,
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<H, B> Ord for OrderedSealedBlock<H, B>
where
H: reth_primitives_traits::BlockHeader + 'static,
B: reth_primitives_traits::BlockBody + 'static,
{
fn cmp(&self, other: &Self) -> Ordering {
self.0.number().cmp(&other.0.number())
}
}
/// The event type emitted by the [`EngineSyncController`].
#[derive(Debug)]
pub(crate) enum EngineSyncEvent<N: NodePrimitives = EthPrimitives> {
/// A full block has been downloaded from the network.
FetchedFullBlock(SealedBlock<N::BlockHeader, N::BlockBody>),
/// Pipeline started syncing
///
/// This is none if the pipeline is triggered without a specific target.
PipelineStarted(Option<PipelineTarget>),
/// Pipeline finished
///
/// If this is returned, the pipeline is idle.
PipelineFinished {
/// Final result of the pipeline run.
result: Result<ControlFlow, PipelineError>,
/// Whether the pipeline reached the configured `max_block`.
///
/// Note: this is only relevant in debugging scenarios.
reached_max_block: bool,
},
/// Pipeline task was dropped after it was started, unable to receive it because channel
/// closed. This would indicate a panicked pipeline task
PipelineTaskDropped,
}
/// The possible pipeline states within the sync controller.
///
/// [`PipelineState::Idle`] means that the pipeline is currently idle.
/// [`PipelineState::Running`] means that the pipeline is currently running.
///
/// NOTE: The differentiation between these two states is important, because when the pipeline is
/// running, it acquires the write lock over the database. This means that we cannot forward to the
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
enum PipelineState<N: ProviderNodeTypes> {
/// Pipeline is idle.
Idle(Option<Pipeline<N>>),
/// Pipeline is running and waiting for a response
Running(oneshot::Receiver<PipelineWithResult<N>>),
}
impl<N: ProviderNodeTypes> PipelineState<N> {
/// Returns `true` if the state matches idle.
const fn is_idle(&self) -> bool {
matches!(self, Self::Idle(_))
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::Header;
use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT;
use assert_matches::assert_matches;
use futures::poll;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_network_p2p::{either::Either, test_utils::TestFullBlockClient, EthBlockClient};
use reth_primitives::{BlockBody, SealedHeader};
use reth_provider::{
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
ExecutionOutcome,
};
use reth_prune_types::PruneModes;
use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
use reth_stages_api::StageCheckpoint;
use reth_static_file::StaticFileProducer;
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, future::poll_fn, ops::Range};
use tokio::sync::watch;
struct TestPipelineBuilder {
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
executor_results: Vec<ExecutionOutcome>,
max_block: Option<BlockNumber>,
}
impl TestPipelineBuilder {
/// Create a new [`TestPipelineBuilder`].
const fn new() -> Self {
Self {
pipeline_exec_outputs: VecDeque::new(),
executor_results: Vec::new(),
max_block: None,
}
}
/// Set the pipeline execution outputs to use for the test consensus engine.
fn with_pipeline_exec_outputs(
mut self,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
) -> Self {
self.pipeline_exec_outputs = pipeline_exec_outputs;
self
}
/// Set the executor results to use for the test consensus engine.
#[allow(dead_code)]
fn with_executor_results(mut self, executor_results: Vec<ExecutionOutcome>) -> Self {
self.executor_results = executor_results;
self
}
/// Sets the max block for the pipeline to run.
#[allow(dead_code)]
const fn with_max_block(mut self, max_block: BlockNumber) -> Self {
self.max_block = Some(max_block);
self
}
/// Builds the pipeline.
fn build(self, chain_spec: Arc<ChainSpec>) -> Pipeline<MockNodeTypesWithDB> {
reth_tracing::init_test_tracing();
// Setup pipeline
let (tip_tx, _tip_rx) = watch::channel(B256::default());
let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
.add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx);
if let Some(max_block) = self.max_block {
pipeline = pipeline.with_max_block(max_block);
}
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec);
let static_file_producer =
StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
pipeline.build(provider_factory, static_file_producer)
}
}
struct TestSyncControllerBuilder<Client> {
max_block: Option<BlockNumber>,
client: Option<Client>,
}
impl<Client> TestSyncControllerBuilder<Client> {
/// Create a new [`TestSyncControllerBuilder`].
const fn new() -> Self {
Self { max_block: None, client: None }
}
/// Sets the max block for the pipeline to run.
#[allow(dead_code)]
const fn with_max_block(mut self, max_block: BlockNumber) -> Self {
self.max_block = Some(max_block);
self
}
/// Sets the client to use for network operations.
fn with_client(mut self, client: Client) -> Self {
self.client = Some(client);
self
}
/// Builds the sync controller.
fn build<N>(
self,
pipeline: Pipeline<N>,
chain_spec: Arc<N::ChainSpec>,
) -> EngineSyncController<N, Either<Client, TestFullBlockClient>>
where
N: ProviderNodeTypes,
Client: EthBlockClient + 'static,
{
let client = self
.client
.map(Either::Left)
.unwrap_or_else(|| Either::Right(TestFullBlockClient::default()));
EngineSyncController::new(
pipeline,
client,
Box::<TokioTaskExecutor>::default(),
self.max_block,
chain_spec,
Default::default(),
)
}
}
#[tokio::test]
async fn pipeline_started_after_setting_target() {
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(MAINNET.genesis.clone())
.paris_activated()
.build(),
);
let client = TestFullBlockClient::default();
insert_headers_into_client(&client, SealedHeader::default(), 0..10);
// force the pipeline to be "done" after 5 blocks
let pipeline = TestPipelineBuilder::new()
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(5),
done: true,
})]))
.build(chain_spec.clone());
let mut sync_controller = TestSyncControllerBuilder::new()
.with_client(client.clone())
.build(pipeline, chain_spec);
let tip = client.highest_block().expect("there should be blocks here");
sync_controller.set_pipeline_sync_target(tip.hash().into());
let sync_future = poll_fn(|cx| sync_controller.poll(cx));
let next_event = poll!(sync_future);
// can assert that the first event here is PipelineStarted because we set the sync target,
// and we should get Ready because the pipeline should be spawned immediately
assert_matches!(next_event, Poll::Ready(EngineSyncEvent::PipelineStarted(Some(target))) => {
assert_eq!(target.sync_target().unwrap(), tip.hash());
});
// the next event should be the pipeline finishing in a good state
let sync_future = poll_fn(|cx| sync_controller.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: 5 }));
// no max block configured
assert!(!reached_max_block);
});
}
fn insert_headers_into_client(
client: &TestFullBlockClient,
genesis_header: SealedHeader,
range: Range<usize>,
) {
let mut sealed_header = genesis_header;
let body = BlockBody::default();
for _ in range {
let (mut header, hash) = sealed_header.split();
// update to the next header
header.parent_hash = hash;
header.number += 1;
header.timestamp += 1;
sealed_header = SealedHeader::seal(header);
client.insert(sealed_header.clone(), body.clone());
}
}
#[tokio::test]
async fn controller_sends_range_request() {
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(MAINNET.genesis.clone())
.paris_activated()
.build(),
);
let client = TestFullBlockClient::default();
let header = Header {
base_fee_per_gas: Some(7),
gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
..Default::default()
};
let header = SealedHeader::seal(header);
insert_headers_into_client(&client, header, 0..10);
// set up a pipeline
let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
let mut sync_controller = TestSyncControllerBuilder::new()
.with_client(client.clone())
.build(pipeline, chain_spec);
let tip = client.highest_block().expect("there should be blocks here");
// call the download range method
sync_controller.download_block_range(tip.hash(), tip.number);
// ensure we have one in flight range request
assert_eq!(sync_controller.inflight_block_range_requests.len(), 1);
// ensure the range request is made correctly
let first_req = sync_controller.inflight_block_range_requests.first().unwrap();
assert_eq!(first_req.start_hash(), tip.hash());
assert_eq!(first_req.count(), tip.number);
// ensure they are in ascending order
for num in 1..=10 {
let sync_future = poll_fn(|cx| sync_controller.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, EngineSyncEvent::FetchedFullBlock(block) => {
assert_eq!(block.number, num);
});
}
}
}

View File

@ -1,467 +0,0 @@
#![allow(missing_docs)]
use crate::{
engine::hooks::PruneHook, hooks::EngineHooks, BeaconConsensusEngine,
BeaconConsensusEngineError, BeaconConsensusEngineHandle, BeaconForkChoiceUpdateError,
EthBeaconConsensus, MIN_BLOCKS_FOR_PIPELINE_RUN,
};
use alloy_primitives::{BlockNumber, B256};
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
};
use reth_chainspec::ChainSpec;
use reth_config::config::StageConfig;
use reth_consensus::{test_utils::TestConsensus, ConsensusError, FullConsensus};
use reth_db::{test_utils::TempDatabase, DatabaseEnv as DE};
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_engine_primitives::{BeaconOnNewPayloadError, EngineApiMessageVersion};
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm::{either::Either, test_utils::MockExecutorProvider};
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_exex_types::FinishedExExHeight;
use reth_network_p2p::{
sync::NoopSyncStateUpdater, test_utils::NoopFullBlockClient, EthBlockClient,
};
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::SealedHeader;
use reth_provider::{
providers::BlockchainProvider,
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
ExecutionOutcome,
};
use reth_prune::Pruner;
use reth_prune_types::PruneModes;
use reth_stages::{sets::DefaultStages, test_utils::TestStages, ExecOutput, Pipeline, StageError};
use reth_static_file::StaticFileProducer;
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, sync::Arc};
use tokio::sync::{oneshot, watch};
type DatabaseEnv = TempDatabase<DE>;
type TestBeaconConsensusEngine<Client> = BeaconConsensusEngine<
MockNodeTypesWithDB,
BlockchainProvider<MockNodeTypesWithDB>,
Arc<Either<Client, NoopFullBlockClient>>,
>;
#[derive(Debug)]
pub struct TestEnv<DB> {
pub db: DB,
// Keep the tip receiver around, so it's not dropped.
#[allow(dead_code)]
tip_rx: watch::Receiver<B256>,
engine_handle: BeaconConsensusEngineHandle<EthEngineTypes>,
}
impl<DB> TestEnv<DB> {
const fn new(
db: DB,
tip_rx: watch::Receiver<B256>,
engine_handle: BeaconConsensusEngineHandle<EthEngineTypes>,
) -> Self {
Self { db, tip_rx, engine_handle }
}
pub async fn send_new_payload<T: Into<ExecutionPayload>>(
&self,
payload: T,
sidecar: ExecutionPayloadSidecar,
) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
self.engine_handle.new_payload(payload.into(), sidecar).await
}
/// Sends the `ExecutionPayload` message to the consensus engine and retries if the engine
/// is syncing.
pub async fn send_new_payload_retry_on_syncing<T: Into<ExecutionPayload>>(
&self,
payload: T,
sidecar: ExecutionPayloadSidecar,
) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
let payload: ExecutionPayload = payload.into();
loop {
let result = self.send_new_payload(payload.clone(), sidecar.clone()).await?;
if !result.is_syncing() {
return Ok(result)
}
}
}
pub async fn send_forkchoice_updated(
&self,
state: ForkchoiceState,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
self.engine_handle
.fork_choice_updated(state, None, EngineApiMessageVersion::default())
.await
}
/// Sends the `ForkchoiceUpdated` message to the consensus engine and retries if the engine
/// is syncing.
pub async fn send_forkchoice_retry_on_syncing(
&self,
state: ForkchoiceState,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
loop {
let result = self
.engine_handle
.fork_choice_updated(state, None, EngineApiMessageVersion::default())
.await?;
if !result.is_syncing() {
return Ok(result)
}
}
}
}
// TODO: add with_consensus in case we want to use the TestConsensus purposeful failure - this
// would require similar patterns to how we use with_client and the downloader
/// Represents either a real consensus engine, or a test consensus engine.
#[derive(Debug, Default)]
enum TestConsensusConfig {
/// Test consensus engine
#[default]
Test,
/// Real consensus engine
Real,
}
/// Represents either test pipeline outputs, or real pipeline configuration.
#[derive(Debug)]
enum TestPipelineConfig {
/// Test pipeline outputs.
Test(VecDeque<Result<ExecOutput, StageError>>),
/// Real pipeline configuration.
Real,
}
impl Default for TestPipelineConfig {
fn default() -> Self {
Self::Test(VecDeque::new())
}
}
/// Represents either test executor results, or real executor configuration.
#[derive(Debug)]
enum TestExecutorConfig {
/// Test executor results.
Test(Vec<ExecutionOutcome>),
/// Real executor configuration.
Real,
}
impl Default for TestExecutorConfig {
fn default() -> Self {
Self::Test(Vec::new())
}
}
/// The basic configuration for a `TestConsensusEngine`, without generics for the client or
/// consensus engine.
#[derive(Debug)]
pub struct TestConsensusEngineBuilder {
chain_spec: Arc<ChainSpec>,
pipeline_config: TestPipelineConfig,
executor_config: TestExecutorConfig,
pipeline_run_threshold: Option<u64>,
max_block: Option<BlockNumber>,
consensus: TestConsensusConfig,
}
impl TestConsensusEngineBuilder {
/// Create a new `TestConsensusEngineBuilder` with the given `ChainSpec`.
pub fn new(chain_spec: Arc<ChainSpec>) -> Self {
Self {
chain_spec,
pipeline_config: Default::default(),
executor_config: Default::default(),
pipeline_run_threshold: None,
max_block: None,
consensus: Default::default(),
}
}
/// Set the pipeline execution outputs to use for the test consensus engine.
pub fn with_pipeline_exec_outputs(
mut self,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
) -> Self {
self.pipeline_config = TestPipelineConfig::Test(pipeline_exec_outputs);
self
}
/// Set the executor results to use for the test consensus engine.
pub fn with_executor_results(mut self, executor_results: Vec<ExecutionOutcome>) -> Self {
self.executor_config = TestExecutorConfig::Test(executor_results);
self
}
/// Sets the max block for the pipeline to run.
pub const fn with_max_block(mut self, max_block: BlockNumber) -> Self {
self.max_block = Some(max_block);
self
}
/// Uses the real pipeline instead of a pipeline with empty exec outputs.
pub fn with_real_pipeline(mut self) -> Self {
self.pipeline_config = TestPipelineConfig::Real;
self
}
/// Uses the real executor instead of a executor with empty results.
pub fn with_real_executor(mut self) -> Self {
self.executor_config = TestExecutorConfig::Real;
self
}
/// Uses a real consensus engine instead of a test consensus engine.
pub const fn with_real_consensus(mut self) -> Self {
self.consensus = TestConsensusConfig::Real;
self
}
/// Disables blockchain tree driven sync. This is the same as setting the pipeline run
/// threshold to 0.
pub const fn disable_blockchain_tree_sync(mut self) -> Self {
self.pipeline_run_threshold = Some(0);
self
}
/// Sets the client to use for network operations.
#[allow(dead_code)]
pub const fn with_client<Client>(
self,
client: Client,
) -> NetworkedTestConsensusEngineBuilder<Client>
where
Client: EthBlockClient + 'static,
{
NetworkedTestConsensusEngineBuilder { base_config: self, client: Some(client) }
}
/// Builds the test consensus engine into a `TestConsensusEngine` and `TestEnv`.
pub fn build(
self,
) -> (TestBeaconConsensusEngine<NoopFullBlockClient>, TestEnv<Arc<DatabaseEnv>>) {
let networked = NetworkedTestConsensusEngineBuilder { base_config: self, client: None };
networked.build()
}
}
/// A builder for `TestConsensusEngine`, allows configuration of mocked pipeline outputs and
/// mocked executor results.
///
/// This optionally includes a client for network operations.
#[derive(Debug)]
pub struct NetworkedTestConsensusEngineBuilder<Client> {
base_config: TestConsensusEngineBuilder,
client: Option<Client>,
}
impl<Client> NetworkedTestConsensusEngineBuilder<Client>
where
Client: EthBlockClient + 'static,
{
/// Set the pipeline execution outputs to use for the test consensus engine.
#[allow(dead_code)]
pub fn with_pipeline_exec_outputs(
mut self,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
) -> Self {
self.base_config.pipeline_config = TestPipelineConfig::Test(pipeline_exec_outputs);
self
}
/// Set the executor results to use for the test consensus engine.
#[allow(dead_code)]
pub fn with_executor_results(mut self, executor_results: Vec<ExecutionOutcome>) -> Self {
self.base_config.executor_config = TestExecutorConfig::Test(executor_results);
self
}
/// Sets the max block for the pipeline to run.
#[allow(dead_code)]
pub const fn with_max_block(mut self, max_block: BlockNumber) -> Self {
self.base_config.max_block = Some(max_block);
self
}
/// Uses the real pipeline instead of a pipeline with empty exec outputs.
#[allow(dead_code)]
pub fn with_real_pipeline(mut self) -> Self {
self.base_config.pipeline_config = TestPipelineConfig::Real;
self
}
/// Uses the real executor instead of a executor with empty results.
#[allow(dead_code)]
pub fn with_real_executor(mut self) -> Self {
self.base_config.executor_config = TestExecutorConfig::Real;
self
}
/// Disables blockchain tree driven sync. This is the same as setting the pipeline run
/// threshold to 0.
#[allow(dead_code)]
pub const fn disable_blockchain_tree_sync(mut self) -> Self {
self.base_config.pipeline_run_threshold = Some(0);
self
}
/// Sets the client to use for network operations.
#[allow(dead_code)]
pub fn with_client<ClientType>(
self,
client: ClientType,
) -> NetworkedTestConsensusEngineBuilder<ClientType>
where
ClientType: EthBlockClient + 'static,
{
NetworkedTestConsensusEngineBuilder { base_config: self.base_config, client: Some(client) }
}
/// Builds the test consensus engine into a `TestConsensusEngine` and `TestEnv`.
pub fn build(self) -> (TestBeaconConsensusEngine<Client>, TestEnv<Arc<DatabaseEnv>>) {
reth_tracing::init_test_tracing();
let provider_factory =
create_test_provider_factory_with_chain_spec(self.base_config.chain_spec.clone());
let consensus: Arc<dyn FullConsensus<Error = ConsensusError>> =
match self.base_config.consensus {
TestConsensusConfig::Real => {
Arc::new(EthBeaconConsensus::new(Arc::clone(&self.base_config.chain_spec)))
}
TestConsensusConfig::Test => Arc::new(TestConsensus::default()),
};
let payload_builder = spawn_test_payload_service::<EthEngineTypes>();
// use either noop client or a user provided client (for example TestFullBlockClient)
let client = Arc::new(
self.client
.map(Either::Left)
.unwrap_or_else(|| Either::Right(NoopFullBlockClient::default())),
);
// use either test executor or real executor
let executor_factory = match self.base_config.executor_config {
TestExecutorConfig::Test(results) => {
let executor_factory = MockExecutorProvider::default();
executor_factory.extend(results);
Either::Left(executor_factory)
}
TestExecutorConfig::Real => {
Either::Right(EthExecutorProvider::ethereum(self.base_config.chain_spec.clone()))
}
};
let static_file_producer =
StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
// Setup pipeline
let (tip_tx, tip_rx) = watch::channel(B256::default());
let mut pipeline = match self.base_config.pipeline_config {
TestPipelineConfig::Test(outputs) => Pipeline::<MockNodeTypesWithDB>::builder()
.add_stages(TestStages::new(outputs, Default::default()))
.with_tip_sender(tip_tx),
TestPipelineConfig::Real => {
let header_downloader = ReverseHeadersDownloaderBuilder::default()
.build(client.clone(), consensus.clone().as_header_validator())
.into_task();
let body_downloader = BodiesDownloaderBuilder::default()
.build(
client.clone(),
consensus.clone().as_consensus(),
provider_factory.clone(),
)
.into_task();
Pipeline::<MockNodeTypesWithDB>::builder().add_stages(DefaultStages::new(
provider_factory.clone(),
tip_rx.clone(),
consensus.clone().as_consensus(),
header_downloader,
body_downloader,
executor_factory.clone(),
StageConfig::default(),
PruneModes::default(),
))
}
};
if let Some(max_block) = self.base_config.max_block {
pipeline = pipeline.with_max_block(max_block);
}
let pipeline = pipeline.build(provider_factory.clone(), static_file_producer);
// Setup blockchain tree
let externals = TreeExternals::new(provider_factory.clone(), consensus, executor_factory);
let tree = Arc::new(ShareableBlockchainTree::new(
BlockchainTree::new(externals, BlockchainTreeConfig::new(1, 2, 3, 2))
.expect("failed to create tree"),
));
let header = self.base_config.chain_spec.genesis_header().clone();
let genesis_block = SealedHeader::seal(header);
let blockchain_provider = BlockchainProvider::with_blocks(
provider_factory.clone(),
tree,
genesis_block,
None,
None,
);
let pruner = Pruner::new_with_factory(
provider_factory.clone(),
vec![],
5,
self.base_config.chain_spec.prune_delete_limit,
None,
watch::channel(FinishedExExHeight::NoExExs).1,
);
let mut hooks = EngineHooks::new();
hooks.add(PruneHook::new(pruner, Box::<TokioTaskExecutor>::default()));
let (mut engine, handle) = BeaconConsensusEngine::new(
client,
pipeline,
blockchain_provider,
Box::<TokioTaskExecutor>::default(),
Box::<NoopSyncStateUpdater>::default(),
None,
payload_builder,
None,
self.base_config.pipeline_run_threshold.unwrap_or(MIN_BLOCKS_FOR_PIPELINE_RUN),
hooks,
)
.expect("failed to create consensus engine");
if let Some(max_block) = self.base_config.max_block {
engine.sync.set_max_block(max_block)
}
(engine, TestEnv::new(provider_factory.db_ref().clone(), tip_rx, handle))
}
}
pub fn spawn_consensus_engine<Client>(
engine: TestBeaconConsensusEngine<Client>,
) -> oneshot::Receiver<Result<(), BeaconConsensusEngineError>>
where
Client: EthBlockClient + 'static,
{
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let result = engine.await;
tx.send(result).expect("failed to forward consensus engine result");
});
rx
}

View File

@ -1,14 +0,0 @@
//! Beacon consensus implementation.
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
pub use reth_ethereum_consensus::EthBeaconConsensus;
mod engine;
pub use engine::*;