feat: new engine API handler (#8559)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
Co-authored-by: Federico Gimenez <fgimenez@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2024-07-01 14:03:44 +02:00
committed by GitHub
parent 068bf57c06
commit 01979c4bde
17 changed files with 2208 additions and 11 deletions

View File

@ -6,8 +6,9 @@ crates/chainspec/ @Rjected @joshieDo @mattsse
crates/cli/ @onbjerg @mattsse crates/cli/ @onbjerg @mattsse
crates/config/ @onbjerg crates/config/ @onbjerg
crates/consensus/ @rkrasiuk @mattsse @Rjected crates/consensus/ @rkrasiuk @mattsse @Rjected
crates/engine @rkrasiuk @mattsse @Rjected
crates/e2e-test-utils/ @mattsse @Rjected crates/e2e-test-utils/ @mattsse @Rjected
crates/engine-primitives/ @rkrasiuk @mattsse @Rjected crates/engine/ @rkrasiuk @mattsse @Rjected @fgimenez
crates/errors/ @mattsse crates/errors/ @mattsse
crates/ethereum/ @mattsse @Rjected crates/ethereum/ @mattsse @Rjected
crates/ethereum-forks/ @mattsse @Rjected crates/ethereum-forks/ @mattsse @Rjected

44
Cargo.lock generated
View File

@ -6966,6 +6966,50 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "reth-engine-tree"
version = "1.0.0"
dependencies = [
"aquamarine",
"assert_matches",
"futures",
"metrics",
"parking_lot 0.12.3",
"reth-beacon-consensus",
"reth-blockchain-tree",
"reth-blockchain-tree-api",
"reth-chainspec",
"reth-consensus",
"reth-db",
"reth-db-api",
"reth-engine-primitives",
"reth-errors",
"reth-ethereum-consensus",
"reth-evm",
"reth-metrics",
"reth-network-p2p",
"reth-payload-builder",
"reth-payload-primitives",
"reth-payload-validator",
"reth-primitives",
"reth-provider",
"reth-prune",
"reth-prune-types",
"reth-revm",
"reth-rpc-types",
"reth-stages",
"reth-stages-api",
"reth-static-file",
"reth-tasks",
"reth-tokio-util",
"reth-tracing",
"reth-trie",
"revm",
"tokio",
"tokio-stream",
"tracing",
]
[[package]] [[package]]
name = "reth-engine-util" name = "reth-engine-util"
version = "1.0.0" version = "1.0.0"

View File

@ -26,6 +26,7 @@ members = [
"crates/ethereum-forks/", "crates/ethereum-forks/",
"crates/e2e-test-utils/", "crates/e2e-test-utils/",
"crates/engine/primitives/", "crates/engine/primitives/",
"crates/engine/tree/",
"crates/engine/util/", "crates/engine/util/",
"crates/errors/", "crates/errors/",
"crates/ethereum-forks/", "crates/ethereum-forks/",
@ -287,6 +288,7 @@ reth-downloaders = { path = "crates/net/downloaders" }
reth-e2e-test-utils = { path = "crates/e2e-test-utils" } reth-e2e-test-utils = { path = "crates/e2e-test-utils" }
reth-ecies = { path = "crates/net/ecies" } reth-ecies = { path = "crates/net/ecies" }
reth-engine-primitives = { path = "crates/engine/primitives" } reth-engine-primitives = { path = "crates/engine/primitives" }
reth-engine-tree = { path = "crates/engine/tree" }
reth-engine-util = { path = "crates/engine/util" } reth-engine-util = { path = "crates/engine/util" }
reth-errors = { path = "crates/errors" } reth-errors = { path = "crates/errors" }
reth-eth-wire = { path = "crates/net/eth-wire" } reth-eth-wire = { path = "crates/net/eth-wire" }

View File

@ -3,7 +3,7 @@ use reth_rpc_types::engine::{ForkchoiceState, PayloadStatusEnum};
/// The struct that keeps track of the received forkchoice state and their status. /// The struct that keeps track of the received forkchoice state and their status.
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub(crate) struct ForkchoiceStateTracker { pub struct ForkchoiceStateTracker {
/// The latest forkchoice state that we received. /// The latest forkchoice state that we received.
/// ///
/// Caution: this can be invalid. /// Caution: this can be invalid.
@ -76,7 +76,7 @@ impl ForkchoiceStateTracker {
} }
/// Returns the last received `ForkchoiceState` to which we need to sync. /// Returns the last received `ForkchoiceState` to which we need to sync.
pub(crate) const fn sync_target_state(&self) -> Option<ForkchoiceState> { pub const fn sync_target_state(&self) -> Option<ForkchoiceState> {
self.last_syncing self.last_syncing
} }
@ -139,9 +139,12 @@ impl From<PayloadStatusEnum> for ForkchoiceStatus {
/// A helper type to check represent hashes of a [`ForkchoiceState`] /// A helper type to check represent hashes of a [`ForkchoiceState`]
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum ForkchoiceStateHash { pub enum ForkchoiceStateHash {
/// Head hash of the [`ForkchoiceState`].
Head(B256), Head(B256),
/// Safe hash of the [`ForkchoiceState`].
Safe(B256), Safe(B256),
/// Finalized hash of the [`ForkchoiceState`].
Finalized(B256), Finalized(B256),
} }

View File

@ -14,7 +14,8 @@ use tracing::warn;
const INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128; const INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128;
/// Keeps track of invalid headers. /// Keeps track of invalid headers.
pub(crate) struct InvalidHeaderCache { #[derive(Debug)]
pub struct InvalidHeaderCache {
/// This maps a header hash to a reference to its invalid ancestor. /// This maps a header hash to a reference to its invalid ancestor.
headers: LruMap<B256, HeaderEntry>, headers: LruMap<B256, HeaderEntry>,
/// Metrics for the cache. /// Metrics for the cache.
@ -34,7 +35,7 @@ impl InvalidHeaderCache {
/// ///
/// If this is called, the hit count for the entry is incremented. /// If this is called, the hit count for the entry is incremented.
/// If the hit count exceeds the threshold, the entry is evicted and `None` is returned. /// If the hit count exceeds the threshold, the entry is evicted and `None` is returned.
pub(crate) fn get(&mut self, hash: &B256) -> Option<Arc<Header>> { pub fn get(&mut self, hash: &B256) -> Option<Arc<Header>> {
{ {
let entry = self.headers.get(hash)?; let entry = self.headers.get(hash)?;
entry.hit_count += 1; entry.hit_count += 1;
@ -49,7 +50,7 @@ impl InvalidHeaderCache {
} }
/// Inserts an invalid block into the cache, with a given invalid ancestor. /// Inserts an invalid block into the cache, with a given invalid ancestor.
pub(crate) fn insert_with_invalid_ancestor( pub fn insert_with_invalid_ancestor(
&mut self, &mut self,
header_hash: B256, header_hash: B256,
invalid_ancestor: Arc<Header>, invalid_ancestor: Arc<Header>,

View File

@ -53,7 +53,7 @@ pub use error::{
}; };
mod invalid_headers; mod invalid_headers;
use invalid_headers::InvalidHeaderCache; pub use invalid_headers::InvalidHeaderCache;
mod event; mod event;
pub use event::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress}; pub use event::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};
@ -62,13 +62,12 @@ mod handle;
pub use handle::BeaconConsensusEngineHandle; pub use handle::BeaconConsensusEngineHandle;
mod forkchoice; mod forkchoice;
pub use forkchoice::ForkchoiceStatus; pub use forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker, ForkchoiceStatus};
use forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker};
mod metrics; mod metrics;
use metrics::EngineMetrics; use metrics::EngineMetrics;
pub(crate) mod sync; pub mod sync;
use sync::{EngineSyncController, EngineSyncEvent}; use sync::{EngineSyncController, EngineSyncEvent};
/// Hooks for running during the main loop of /// Hooks for running during the main loop of

View File

@ -0,0 +1,64 @@
[package]
name = "reth-engine-tree"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
[lints]
workspace = true
[dependencies]
# reth
reth-beacon-consensus.workspace = true
reth-blockchain-tree.workspace = true
reth-blockchain-tree-api.workspace = true
reth-chainspec.workspace = true
reth-consensus.workspace = true
reth-db.workspace = true
reth-db-api.workspace = true
reth-engine-primitives.workspace = true
reth-errors.workspace = true
reth-ethereum-consensus.workspace = true
reth-evm.workspace = true
reth-network-p2p.workspace = true
reth-payload-builder.workspace = true
reth-payload-primitives.workspace = true
reth-payload-validator.workspace = true
reth-primitives.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
reth-revm.workspace = true
reth-rpc-types.workspace = true
reth-stages-api.workspace = true
reth-static-file.workspace = true
reth-tasks.workspace = true
reth-tokio-util.workspace = true
reth-trie.workspace = true
revm.workspace = true
# common
futures.workspace = true
tokio = { workspace = true, features = ["macros", "sync"] }
tokio-stream = { workspace = true, features = ["sync"] }
# metrics
metrics.workspace = true
reth-metrics = { workspace = true, features = ["common"] }
# misc
aquamarine.workspace = true
parking_lot.workspace = true
tracing.workspace = true
[dev-dependencies]
# reth
reth-network-p2p = { workspace = true, features = ["test-utils"] }
reth-prune-types.workspace = true
reth-stages = { workspace = true, features = ["test-utils"] }
reth-tracing.workspace = true
assert_matches.workspace = true

View File

@ -0,0 +1,342 @@
//! It is expected that the node has two sync modes:
//!
//! - Backfill sync: Sync to a certain block height in stages, e.g. download data from p2p then
//! execute that range.
//! - Live sync: In this mode the nodes is keeping up with the latest tip and listens for new
//! requests from the consensus client.
//!
//! These modes are mutually exclusive and the node can only be in one mode at a time.
use futures::FutureExt;
use reth_db_api::database::Database;
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
use tokio::sync::oneshot;
use tracing::trace;
/// Backfill sync mode functionality.
pub trait BackfillSync: Send + Sync {
/// Performs a backfill action.
fn on_action(&mut self, action: BackfillAction);
/// Polls the pipeline for completion.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent>;
}
/// The backfill actions that can be performed.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BackfillAction {
/// Start backfilling with the given target.
Start(PipelineTarget),
}
/// The events that can be emitted on backfill sync.
#[derive(Debug)]
pub enum BackfillEvent {
/// Backfill sync idle.
Idle,
/// Backfill sync started.
Started(PipelineTarget),
/// Backfill sync finished.
///
/// If this is returned, backfill sync is idle.
Finished(Result<ControlFlow, PipelineError>),
/// Sync task was dropped after it was started, unable to receive it because
/// channel closed. This would indicate a panicked task.
TaskDropped(String),
}
/// Pipeline sync.
#[derive(Debug)]
pub struct PipelineSync<DB>
where
DB: Database,
{
/// The type that can spawn the pipeline task.
pipeline_task_spawner: Box<dyn TaskSpawner>,
/// The current state of the pipeline.
/// The pipeline is used for large ranges.
pipeline_state: PipelineState<DB>,
/// Pending target block for the pipeline to sync
pending_pipeline_target: Option<PipelineTarget>,
}
impl<DB> PipelineSync<DB>
where
DB: Database + 'static,
{
/// Create a new instance.
pub fn new(pipeline: Pipeline<DB>, pipeline_task_spawner: Box<dyn TaskSpawner>) -> Self {
Self {
pipeline_task_spawner,
pipeline_state: PipelineState::Idle(Some(pipeline)),
pending_pipeline_target: None,
}
}
/// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
#[allow(dead_code)]
const fn is_pipeline_sync_pending(&self) -> bool {
self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
}
/// Returns `true` if the pipeline is idle.
const fn is_pipeline_idle(&self) -> bool {
self.pipeline_state.is_idle()
}
/// Returns `true` if the pipeline is active.
const fn is_pipeline_active(&self) -> bool {
!self.is_pipeline_idle()
}
/// Sets a new target to sync the pipeline to.
///
/// But ensures the target is not the zero hash.
fn set_pipeline_sync_target(&mut self, target: PipelineTarget) {
if target.sync_target().is_some_and(|target| target.is_zero()) {
trace!(
target: "consensus::engine::sync",
"Pipeline target cannot be zero hash."
);
// precaution to never sync to the zero hash
return
}
self.pending_pipeline_target = Some(target);
}
/// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to
/// run continuously.
fn try_spawn_pipeline(&mut self) -> Option<BackfillEvent> {
match &mut self.pipeline_state {
PipelineState::Idle(pipeline) => {
let target = self.pending_pipeline_target.take()?;
let (tx, rx) = oneshot::channel();
let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking(
"pipeline task",
Box::pin(async move {
let result = pipeline.run_as_fut(Some(target)).await;
let _ = tx.send(result);
}),
);
self.pipeline_state = PipelineState::Running(rx);
Some(BackfillEvent::Started(target))
}
PipelineState::Running(_) => None,
}
}
/// Advances the pipeline state.
///
/// This checks for the result in the channel, or returns pending if the pipeline is idle.
fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent> {
let res = match self.pipeline_state {
PipelineState::Idle(_) => return Poll::Pending,
PipelineState::Running(ref mut fut) => {
ready!(fut.poll_unpin(cx))
}
};
let ev = match res {
Ok((_, result)) => BackfillEvent::Finished(result),
Err(why) => {
// failed to receive the pipeline
BackfillEvent::TaskDropped(why.to_string())
}
};
Poll::Ready(ev)
}
}
impl<DB> BackfillSync for PipelineSync<DB>
where
DB: Database + 'static,
{
fn on_action(&mut self, event: BackfillAction) {
match event {
BackfillAction::Start(target) => self.set_pipeline_sync_target(target),
}
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent> {
// try to spawn a pipeline if a target is set
if let Some(event) = self.try_spawn_pipeline() {
return Poll::Ready(event)
}
// make sure we poll the pipeline if it's active, and return any ready pipeline events
if !self.is_pipeline_idle() {
// advance the pipeline
if let Poll::Ready(event) = self.poll_pipeline(cx) {
return Poll::Ready(event)
}
}
Poll::Pending
}
}
/// The possible pipeline states within the sync controller.
///
/// [`PipelineState::Idle`] means that the pipeline is currently idle.
/// [`PipelineState::Running`] means that the pipeline is currently running.
///
/// NOTE: The differentiation between these two states is important, because when the pipeline is
/// running, it acquires the write lock over the database. This means that we cannot forward to the
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
#[derive(Debug)]
enum PipelineState<DB: Database> {
/// Pipeline is idle.
Idle(Option<Pipeline<DB>>),
/// Pipeline is running and waiting for a response
Running(oneshot::Receiver<PipelineWithResult<DB>>),
}
impl<DB: Database> PipelineState<DB> {
/// Returns `true` if the state matches idle.
const fn is_idle(&self) -> bool {
matches!(self, Self::Idle(_))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::insert_headers_into_client;
use assert_matches::assert_matches;
use futures::poll;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase};
use reth_network_p2p::test_utils::TestFullBlockClient;
use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, BlockNumber, Header, B256};
use reth_provider::{
test_utils::create_test_provider_factory_with_chain_spec, ExecutionOutcome,
};
use reth_prune_types::PruneModes;
use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
use reth_stages_api::StageCheckpoint;
use reth_static_file::StaticFileProducer;
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, future::poll_fn, sync::Arc};
use tokio::sync::watch;
struct TestHarness {
pipeline_sync: PipelineSync<Arc<TempDatabase<DatabaseEnv>>>,
tip: B256,
}
impl TestHarness {
fn new(total_blocks: usize, pipeline_done_after: u64) -> Self {
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(MAINNET.genesis.clone())
.paris_activated()
.build(),
);
// force the pipeline to be "done" after 5 blocks
let pipeline = TestPipelineBuilder::new()
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(BlockNumber::from(pipeline_done_after)),
done: true,
})]))
.build(chain_spec);
let pipeline_sync = PipelineSync::new(pipeline, Box::<TokioTaskExecutor>::default());
let client = TestFullBlockClient::default();
let header = Header {
base_fee_per_gas: Some(7),
gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
..Default::default()
}
.seal_slow();
insert_headers_into_client(&client, header, 0..total_blocks);
let tip = client.highest_block().expect("there should be blocks here").hash();
Self { pipeline_sync, tip }
}
}
struct TestPipelineBuilder {
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
executor_results: Vec<ExecutionOutcome>,
}
impl TestPipelineBuilder {
/// Create a new [`TestPipelineBuilder`].
const fn new() -> Self {
Self { pipeline_exec_outputs: VecDeque::new(), executor_results: Vec::new() }
}
/// Set the pipeline execution outputs to use for the test consensus engine.
fn with_pipeline_exec_outputs(
mut self,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
) -> Self {
self.pipeline_exec_outputs = pipeline_exec_outputs;
self
}
/// Set the executor results to use for the test consensus engine.
#[allow(dead_code)]
fn with_executor_results(mut self, executor_results: Vec<ExecutionOutcome>) -> Self {
self.executor_results = executor_results;
self
}
/// Builds the pipeline.
fn build(self, chain_spec: Arc<ChainSpec>) -> Pipeline<Arc<TempDatabase<DatabaseEnv>>> {
reth_tracing::init_test_tracing();
// Setup pipeline
let (tip_tx, _tip_rx) = watch::channel(B256::default());
let pipeline = Pipeline::builder()
.add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx);
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec);
let static_file_producer =
StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
pipeline.build(provider_factory, static_file_producer)
}
}
#[tokio::test]
async fn pipeline_started_and_finished() {
const TOTAL_BLOCKS: usize = 10;
const PIPELINE_DONE_AFTER: u64 = 5;
let TestHarness { mut pipeline_sync, tip } =
TestHarness::new(TOTAL_BLOCKS, PIPELINE_DONE_AFTER);
let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
let next_event = poll!(sync_future);
// sync target not set, pipeline not started
assert_matches!(next_event, Poll::Pending);
pipeline_sync.on_action(BackfillAction::Start(PipelineTarget::Sync(tip)));
let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
let next_event = poll!(sync_future);
// sync target set, pipeline started
assert_matches!(next_event, Poll::Ready(BackfillEvent::Started(target)) => {
assert_eq!(target.sync_target().unwrap(), tip);
});
// the next event should be the pipeline finishing in a good state
let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, BackfillEvent::Finished(result) => {
assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: PIPELINE_DONE_AFTER }));
});
}
}

View File

@ -0,0 +1,218 @@
use crate::backfill::{BackfillAction, BackfillEvent, BackfillSync};
use futures::Stream;
use reth_stages_api::PipelineTarget;
use std::{
pin::Pin,
task::{Context, Poll},
};
/// The type that drives the chain forward.
///
/// A state machine that orchestrates the components responsible for advancing the chain
///
///
/// ## Control flow
///
/// The [`ChainOrchestrator`] is responsible for controlling the pipeline sync and additional hooks.
/// It polls the given `handler`, which is responsible for advancing the chain, how is up to the
/// handler. However, due to database restrictions (e.g. exclusive write access), following
/// invariants apply:
/// - If the handler requests a backfill run (e.g. [`BackfillAction::Start`]), the handler must
/// ensure that while the pipeline is running, no other write access is granted.
/// - At any time the [`ChainOrchestrator`] can request exclusive write access to the database
/// (e.g. if pruning is required), but will not do so until the handler has acknowledged the
/// request for write access.
///
/// The [`ChainOrchestrator`] polls the [`ChainHandler`] to advance the chain and handles the
/// emitted events. Requests and events are passed to the [`ChainHandler`] via
/// [`ChainHandler::on_event`].
#[must_use = "Stream does nothing unless polled"]
#[derive(Debug)]
pub struct ChainOrchestrator<T, P>
where
T: ChainHandler,
P: BackfillSync,
{
/// The handler for advancing the chain.
handler: T,
/// Controls pipeline sync.
pipeline: P,
}
impl<T, P> ChainOrchestrator<T, P>
where
T: ChainHandler + Unpin,
P: BackfillSync + Unpin,
{
/// Creates a new [`ChainOrchestrator`] with the given handler and pipeline.
pub const fn new(handler: T, pipeline: P) -> Self {
Self { handler, pipeline }
}
/// Returns the handler
pub const fn handler(&self) -> &T {
&self.handler
}
/// Returns a mutable reference to the handler
pub fn handler_mut(&mut self) -> &mut T {
&mut self.handler
}
/// Internal function used to advance the chain.
///
/// Polls the `ChainOrchestrator` for the next event.
#[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))]
fn poll_next_event(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ChainEvent<T::Event>> {
let this = self.get_mut();
// This loop polls the components
//
// 1. Polls the pipeline to completion, if active.
// 2. Advances the chain by polling the handler.
'outer: loop {
// try to poll the pipeline to completion, if active
match this.pipeline.poll(cx) {
Poll::Ready(pipeline_event) => match pipeline_event {
BackfillEvent::Idle => {}
BackfillEvent::Started(_) => {
// notify handler that pipeline started
this.handler.on_event(FromOrchestrator::PipelineStarted);
return Poll::Ready(ChainEvent::PipelineStarted);
}
BackfillEvent::Finished(res) => {
return match res {
Ok(event) => {
tracing::debug!(?event, "pipeline finished");
// notify handler that pipeline finished
this.handler.on_event(FromOrchestrator::PipelineFinished);
Poll::Ready(ChainEvent::PipelineFinished)
}
Err(err) => {
tracing::error!( %err, "pipeline failed");
Poll::Ready(ChainEvent::FatalError)
}
}
}
BackfillEvent::TaskDropped(err) => {
tracing::error!( %err, "pipeline task dropped");
return Poll::Ready(ChainEvent::FatalError);
}
},
Poll::Pending => {}
}
// poll the handler for the next event
match this.handler.poll(cx) {
Poll::Ready(handler_event) => {
match handler_event {
HandlerEvent::Pipeline(target) => {
// trigger pipeline and start polling it
this.pipeline.on_action(BackfillAction::Start(target));
continue 'outer
}
HandlerEvent::Event(ev) => {
// bubble up the event
return Poll::Ready(ChainEvent::Handler(ev));
}
}
}
Poll::Pending => {
// no more events to process
break 'outer
}
}
}
Poll::Pending
}
}
impl<T, P> Stream for ChainOrchestrator<T, P>
where
T: ChainHandler + Unpin,
P: BackfillSync + Unpin,
{
type Item = ChainEvent<T::Event>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.as_mut().poll_next_event(cx).map(Some)
}
}
/// Represents the sync mode the chain is operating in.
#[derive(Debug, Default)]
enum SyncMode {
#[default]
Handler,
Backfill,
}
/// Event emitted by the [`ChainOrchestrator`]
///
/// These are meant to be used for observability and debugging purposes.
#[derive(Debug)]
pub enum ChainEvent<T> {
/// Pipeline sync started
PipelineStarted,
/// Pipeline sync finished
PipelineFinished,
/// Fatal error
FatalError,
/// Event emitted by the handler
Handler(T),
}
/// A trait that advances the chain by handling actions.
///
/// This is intended to be implement the chain consensus logic, for example `engine` API.
pub trait ChainHandler: Send + Sync {
/// Event generated by this handler that orchestrator can bubble up;
type Event: Send;
/// Informs the handler about an event from the [`ChainOrchestrator`].
fn on_event(&mut self, event: FromOrchestrator);
/// Polls for actions that [`ChainOrchestrator`] should handle.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent<Self::Event>>;
}
/// Events/Requests that the [`ChainHandler`] can emit to the [`ChainOrchestrator`].
#[derive(Clone, Debug)]
pub enum HandlerEvent<T> {
/// Request to start a pipeline sync
Pipeline(PipelineTarget),
/// Other event emitted by the handler
Event(T),
}
/// Internal events issued by the [`ChainOrchestrator`].
#[derive(Clone, Debug)]
pub enum FromOrchestrator {
/// Invoked when pipeline sync finished
PipelineFinished,
/// Invoked when pipeline started
PipelineStarted,
}
/// Represents the state of the chain.
#[derive(Clone, Copy, PartialEq, Eq, Default, Debug)]
pub enum OrchestratorState {
/// Orchestrator has exclusive write access to the database.
PipelineActive,
/// Node is actively processing the chain.
#[default]
Idle,
}
impl OrchestratorState {
/// Returns `true` if the state is [`OrchestratorState::PipelineActive`].
pub const fn is_pipeline_active(&self) -> bool {
matches!(self, Self::PipelineActive)
}
/// Returns `true` if the state is [`OrchestratorState::Idle`].
pub const fn is_idle(&self) -> bool {
matches!(self, Self::Idle)
}
}

View File

@ -0,0 +1,414 @@
//! Handler that can download blocks on demand (e.g. from the network).
use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics};
use futures::FutureExt;
use reth_consensus::Consensus;
use reth_network_p2p::{
bodies::client::BodiesClient,
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
headers::client::HeadersClient,
};
use reth_primitives::{SealedBlock, SealedBlockWithSenders, B256};
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap, HashSet},
sync::Arc,
task::{Context, Poll},
};
use tracing::trace;
/// A trait that can download blocks on demand.
pub trait BlockDownloader: Send + Sync {
/// Handle an action.
fn on_action(&mut self, event: DownloadAction);
/// Advance in progress requests if any
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome>;
}
/// Actions that can be performed by the block downloader.
#[derive(Debug)]
pub enum DownloadAction {
/// Stop downloading blocks.
Clear,
/// Download given blocks
Download(DownloadRequest),
}
/// Outcome of downloaded blocks.
#[derive(Debug)]
pub enum DownloadOutcome {
/// Downloaded blocks.
Blocks(Vec<SealedBlockWithSenders>),
}
/// Basic [`BlockDownloader`].
pub struct BasicBlockDownloader<Client>
where
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
/// A downloader that can download full blocks from the network.
full_block_client: FullBlockClient<Client>,
/// In-flight full block requests in progress.
inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
/// In-flight full block _range_ requests in progress.
inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
/// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
/// ordering. This means the blocks will be popped from the heap with ascending block numbers.
set_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlockWithSenders>>,
/// Engine download metrics.
metrics: BlockDownloaderMetrics,
}
impl<Client> BasicBlockDownloader<Client>
where
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
/// Create a new instance
pub(crate) fn new(client: Client, consensus: Arc<dyn Consensus>) -> Self {
Self {
full_block_client: FullBlockClient::new(client, consensus),
inflight_full_block_requests: Vec::new(),
inflight_block_range_requests: Vec::new(),
set_buffered_blocks: BinaryHeap::new(),
metrics: BlockDownloaderMetrics::default(),
}
}
/// Clears the stored inflight requests.
fn clear(&mut self) {
self.inflight_full_block_requests.clear();
self.inflight_block_range_requests.clear();
self.set_buffered_blocks.clear();
self.update_block_download_metrics();
}
/// Processes a download request.
fn download(&mut self, request: DownloadRequest) {
match request {
DownloadRequest::BlockSet(hashes) => self.download_block_set(hashes),
DownloadRequest::BlockRange(hash, count) => self.download_block_range(hash, count),
}
}
/// Processes a block set download request.
fn download_block_set(&mut self, hashes: HashSet<B256>) {
for hash in hashes {
self.download_full_block(hash);
}
}
/// Processes a block range download request.
fn download_block_range(&mut self, hash: B256, count: u64) {
if count == 1 {
self.download_full_block(hash);
} else {
trace!(
target: "consensus::engine",
?hash,
?count,
"start downloading full block range."
);
let request = self.full_block_client.get_full_block_range(hash, count);
self.inflight_block_range_requests.push(request);
}
}
/// Starts requesting a full block from the network.
///
/// Returns `true` if the request was started, `false` if there's already a request for the
/// given hash.
fn download_full_block(&mut self, hash: B256) -> bool {
if self.is_inflight_request(hash) {
return false
}
trace!(
target: "consensus::engine::sync",
?hash,
"Start downloading full block"
);
let request = self.full_block_client.get_full_block(hash);
self.inflight_full_block_requests.push(request);
self.update_block_download_metrics();
true
}
/// Returns true if there's already a request for the given hash.
fn is_inflight_request(&self, hash: B256) -> bool {
self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)
}
/// Sets the metrics for the active downloads
fn update_block_download_metrics(&self) {
self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
// TODO: full block range metrics
}
}
impl<Client> BlockDownloader for BasicBlockDownloader<Client>
where
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
/// Handles incoming download actions.
fn on_action(&mut self, event: DownloadAction) {
match event {
DownloadAction::Clear => self.clear(),
DownloadAction::Download(request) => self.download(request),
}
}
/// Advances the download process.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome> {
// advance all full block requests
for idx in (0..self.inflight_full_block_requests.len()).rev() {
let mut request = self.inflight_full_block_requests.swap_remove(idx);
if let Poll::Ready(block) = request.poll_unpin(cx) {
trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering");
self.set_buffered_blocks.push(Reverse(block.into()));
} else {
// still pending
self.inflight_full_block_requests.push(request);
}
}
// advance all full block range requests
for idx in (0..self.inflight_block_range_requests.len()).rev() {
let mut request = self.inflight_block_range_requests.swap_remove(idx);
if let Poll::Ready(blocks) = request.poll_unpin(cx) {
trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering");
self.set_buffered_blocks.extend(
blocks
.into_iter()
.map(|b| {
let senders = b.senders().unwrap_or_default();
OrderedSealedBlockWithSenders(SealedBlockWithSenders {
block: b,
senders,
})
})
.map(Reverse),
);
} else {
// still pending
self.inflight_block_range_requests.push(request);
}
}
self.update_block_download_metrics();
if self.set_buffered_blocks.is_empty() {
return Poll::Pending;
}
// drain all unique element of the block buffer if there are any
let mut downloaded_blocks: Vec<SealedBlockWithSenders> =
Vec::with_capacity(self.set_buffered_blocks.len());
while let Some(block) = self.set_buffered_blocks.pop() {
// peek ahead and pop duplicates
while let Some(peek) = self.set_buffered_blocks.peek_mut() {
if peek.0 .0.hash() == block.0 .0.hash() {
PeekMut::pop(peek);
} else {
break
}
}
downloaded_blocks.push(block.0.into());
}
Poll::Ready(DownloadOutcome::Blocks(downloaded_blocks))
}
}
/// A wrapper type around [`SealedBlockWithSenders`] that implements the [Ord]
/// trait by block number.
#[derive(Debug, Clone, PartialEq, Eq)]
struct OrderedSealedBlockWithSenders(SealedBlockWithSenders);
impl PartialOrd for OrderedSealedBlockWithSenders {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for OrderedSealedBlockWithSenders {
fn cmp(&self, other: &Self) -> Ordering {
self.0.number.cmp(&other.0.number)
}
}
impl From<SealedBlock> for OrderedSealedBlockWithSenders {
fn from(block: SealedBlock) -> Self {
let senders = block.senders().unwrap_or_default();
Self(SealedBlockWithSenders { block, senders })
}
}
impl From<OrderedSealedBlockWithSenders> for SealedBlockWithSenders {
fn from(value: OrderedSealedBlockWithSenders) -> Self {
let senders = value.0.senders;
Self { block: value.0.block, senders }
}
}
/// A [`BlockDownloader`] that does nothing.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct NoopBlockDownloader;
impl BlockDownloader for NoopBlockDownloader {
fn on_action(&mut self, _event: DownloadAction) {}
fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<DownloadOutcome> {
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::insert_headers_into_client;
use assert_matches::assert_matches;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_network_p2p::test_utils::TestFullBlockClient;
use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, Header};
use std::{future::poll_fn, sync::Arc};
struct TestHarness {
block_downloader: BasicBlockDownloader<TestFullBlockClient>,
client: TestFullBlockClient,
}
impl TestHarness {
fn new(total_blocks: usize) -> Self {
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(MAINNET.genesis.clone())
.paris_activated()
.build(),
);
let client = TestFullBlockClient::default();
let header = Header {
base_fee_per_gas: Some(7),
gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
..Default::default()
}
.seal_slow();
insert_headers_into_client(&client, header, 0..total_blocks);
let consensus = Arc::new(EthBeaconConsensus::new(chain_spec));
let block_downloader = BasicBlockDownloader::new(client.clone(), consensus);
Self { block_downloader, client }
}
}
#[tokio::test]
async fn block_downloader_range_request() {
const TOTAL_BLOCKS: usize = 10;
let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
let tip = client.highest_block().expect("there should be blocks here");
// send block range download request
block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange(
tip.hash(),
tip.number,
)));
// ensure we have one in flight range request
assert_eq!(block_downloader.inflight_block_range_requests.len(), 1);
// ensure the range request is made correctly
let first_req = block_downloader.inflight_block_range_requests.first().unwrap();
assert_eq!(first_req.start_hash(), tip.hash());
assert_eq!(first_req.count(), tip.number);
// poll downloader
let sync_future = poll_fn(|cx| block_downloader.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
// ensure all blocks were obtained
assert_eq!(blocks.len(), TOTAL_BLOCKS);
// ensure they are in ascending order
for num in 1..=TOTAL_BLOCKS {
assert_eq!(blocks[num-1].number, num as u64);
}
});
}
#[tokio::test]
async fn block_downloader_set_request() {
const TOTAL_BLOCKS: usize = 2;
let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
let tip = client.highest_block().expect("there should be blocks here");
// send block set download request
block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockSet(
HashSet::from([tip.hash(), tip.parent_hash]),
)));
// ensure we have TOTAL_BLOCKS in flight full block request
assert_eq!(block_downloader.inflight_full_block_requests.len(), TOTAL_BLOCKS);
// poll downloader
let sync_future = poll_fn(|cx| block_downloader.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
// ensure all blocks were obtained
assert_eq!(blocks.len(), TOTAL_BLOCKS);
// ensure they are in ascending order
for num in 1..=TOTAL_BLOCKS {
assert_eq!(blocks[num-1].number, num as u64);
}
});
}
#[tokio::test]
async fn block_downloader_clear_request() {
const TOTAL_BLOCKS: usize = 10;
let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
let tip = client.highest_block().expect("there should be blocks here");
// send block range download request
block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange(
tip.hash(),
tip.number,
)));
// send block set download request
let download_set = HashSet::from([tip.hash(), tip.parent_hash]);
block_downloader
.on_action(DownloadAction::Download(DownloadRequest::BlockSet(download_set.clone())));
// ensure we have one in flight range request
assert_eq!(block_downloader.inflight_block_range_requests.len(), 1);
// ensure the range request is made correctly
let first_req = block_downloader.inflight_block_range_requests.first().unwrap();
assert_eq!(first_req.start_hash(), tip.hash());
assert_eq!(first_req.count(), tip.number);
// ensure we have download_set.len() in flight full block request
assert_eq!(block_downloader.inflight_full_block_requests.len(), download_set.len());
// send clear request
block_downloader.on_action(DownloadAction::Clear);
// ensure we have no in flight range request
assert_eq!(block_downloader.inflight_block_range_requests.len(), 0);
// ensure we have no in flight full block request
assert_eq!(block_downloader.inflight_full_block_requests.len(), 0);
}
}

View File

@ -0,0 +1,212 @@
//! An engine API handler for the chain.
use crate::{
chain::{ChainHandler, FromOrchestrator, HandlerEvent},
download::{BlockDownloader, DownloadAction, DownloadOutcome},
};
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_engine_primitives::EngineTypes;
use reth_primitives::{SealedBlockWithSenders, B256};
use std::{
collections::HashSet,
task::{Context, Poll},
};
use tokio::sync::mpsc;
/// Advances the chain based on incoming requests.
///
/// This is a general purpose request handler with network access.
/// This type listens for incoming messages and processes them via the configured request handler.
///
/// ## Overview
///
/// This type is an orchestrator for incoming messages and responsible for delegating requests
/// received from the CL to the handler.
///
/// It is responsible for handling the following:
/// - Downloading blocks on demand from the network if requested by the [`EngineApiRequestHandler`].
///
/// The core logic is part of the [`EngineRequestHandler`], which is responsible for processing the
/// incoming requests.
#[derive(Debug)]
pub struct EngineHandler<T, S, D> {
/// Processes requests.
///
/// This type is responsible for processing incoming requests.
handler: T,
/// Receiver for incoming requests that need to be processed.
incoming_requests: S,
/// A downloader to download blocks on demand.
downloader: D,
}
impl<T, S, D> EngineHandler<T, S, D> {
/// Creates a new [`EngineHandler`] with the given handler and downloader.
pub const fn new(handler: T, downloader: D, incoming_requests: S) -> Self
where
T: EngineRequestHandler,
{
Self { handler, incoming_requests, downloader }
}
}
impl<T, S, D> ChainHandler for EngineHandler<T, S, D>
where
T: EngineRequestHandler,
S: Stream<Item = T::Request> + Send + Sync + Unpin + 'static,
D: BlockDownloader,
{
type Event = T::Event;
fn on_event(&mut self, event: FromOrchestrator) {
// delegate event to the handler
self.handler.on_event(event.into());
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent<Self::Event>> {
loop {
// drain the handler first
while let Poll::Ready(ev) = self.handler.poll(cx) {
match ev {
RequestHandlerEvent::Idle => break,
RequestHandlerEvent::HandlerEvent(ev) => {
return match ev {
HandlerEvent::Pipeline(target) => {
// bubble up pipeline request
self.downloader.on_action(DownloadAction::Clear);
Poll::Ready(HandlerEvent::Pipeline(target))
}
HandlerEvent::Event(ev) => {
// bubble up the event
Poll::Ready(HandlerEvent::Event(ev))
}
}
}
RequestHandlerEvent::Download(req) => {
// delegate download request to the downloader
self.downloader.on_action(DownloadAction::Download(req));
}
}
}
// pop the next incoming request
if let Poll::Ready(Some(req)) = self.incoming_requests.poll_next_unpin(cx) {
// and delegate the request to the handler
self.handler.on_event(FromEngine::Request(req));
// skip downloading in this iteration to allow the handler to process the request
continue
}
// advance the downloader
if let Poll::Ready(DownloadOutcome::Blocks(blocks)) = self.downloader.poll(cx) {
// delegate the downloaded blocks to the handler
self.handler.on_event(FromEngine::DownloadedBlocks(blocks));
continue
}
return Poll::Pending
}
}
}
/// A type that processes incoming requests (e.g. requests from the consensus layer, engine API)
pub trait EngineRequestHandler: Send + Sync {
/// Even type this handler can emit
type Event: Send;
/// The request type this handler can process.
type Request;
/// Informs the handler about an event from the [`EngineHandler`].
fn on_event(&mut self, event: FromEngine<Self::Request>);
/// Advances the handler.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>>;
}
/// An [`EngineRequestHandler`] that processes engine API requests by delegating to an execution
/// task.
///
/// This type is responsible for advancing the chain during live sync (following the tip of the
/// chain).
///
/// It advances the chain based on received engine API requests by delegating them to the tree
/// executor.
///
/// There are two types of requests that can be processed:
///
/// - `on_new_payload`: Executes the payload and inserts it into the tree. These are allowed to be
/// processed concurrently.
/// - `on_forkchoice_updated`: Updates the fork choice based on the new head. These require write
/// access to the database and are skipped if the handler can't acquire exclusive access to the
/// database.
///
/// In case required blocks are missing, the handler will request them from the network, by emitting
/// a download request upstream.
#[derive(Debug)]
pub struct EngineApiRequestHandler<T: EngineTypes> {
/// channel to send messages to the tree to execute the payload.
to_tree: std::sync::mpsc::Sender<FromEngine<BeaconEngineMessage<T>>>,
/// channel to receive messages from the tree.
from_tree: mpsc::UnboundedReceiver<EngineApiEvent>,
// TODO add db controller
}
impl<T> EngineApiRequestHandler<T> where T: EngineTypes {}
impl<T> EngineRequestHandler for EngineApiRequestHandler<T>
where
T: EngineTypes,
{
type Event = EngineApiEvent;
type Request = BeaconEngineMessage<T>;
fn on_event(&mut self, event: FromEngine<Self::Request>) {
// delegate to the tree
let _ = self.to_tree.send(event);
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>> {
todo!("poll tree and handle db")
}
}
/// Events emitted by the engine API handler.
#[derive(Debug)]
pub enum EngineApiEvent {}
#[derive(Debug)]
pub enum FromEngine<Req> {
/// Event from the top level orchestrator.
Event(FromOrchestrator),
/// Request from the engine
Request(Req),
/// Downloaded blocks from the network.
DownloadedBlocks(Vec<SealedBlockWithSenders>),
}
impl<Req> From<FromOrchestrator> for FromEngine<Req> {
fn from(event: FromOrchestrator) -> Self {
Self::Event(event)
}
}
/// Requests produced by a [`EngineRequestHandler`].
#[derive(Debug)]
pub enum RequestHandlerEvent<T> {
/// The handler is idle.
Idle,
/// An event emitted by the handler.
HandlerEvent(HandlerEvent<T>),
/// Request to download blocks.
Download(DownloadRequest),
}
/// A request to download blocks from the network.
#[derive(Debug)]
pub enum DownloadRequest {
/// Download the given set of blocks.
BlockSet(HashSet<B256>),
/// Download the given range of blocks.
BlockRange(B256, u64),
}

View File

@ -0,0 +1,31 @@
//! This crate includes the core components for advancing a reth chain.
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
// #![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![allow(missing_docs, dead_code, missing_debug_implementations, unused_variables)] // TODO rm
/// Re-export of the blockchain tree API.
pub use reth_blockchain_tree_api::*;
/// Support for backfill sync mode.
pub mod backfill;
/// The type that drives the chain forward.
pub mod chain;
/// Support for downloading blocks on demand for live sync.
pub mod download;
/// Engine Api chain handler support.
pub mod engine;
/// Metrics support.
pub mod metrics;
/// The background writer task for batch db writes.
pub mod persistence;
/// Support for interacting with the blockchain tree.
pub mod tree;
#[cfg(test)]
mod test_utils;

View File

@ -0,0 +1,9 @@
use reth_metrics::{metrics::Gauge, Metrics};
/// Metrics for the `BasicBlockDownloader`.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
pub(crate) struct BlockDownloaderMetrics {
/// How many blocks are currently being downloaded.
pub(crate) active_block_downloads: Gauge,
}

View File

@ -0,0 +1,139 @@
#![allow(dead_code)]
use crate::tree::ExecutedBlock;
use reth_db::database::Database;
use reth_errors::ProviderResult;
use reth_primitives::B256;
use reth_provider::ProviderFactory;
use std::sync::mpsc::{Receiver, Sender};
use tokio::sync::oneshot;
/// Writes parts of reth's in memory tree state to the database.
///
/// This is meant to be a spawned task that listens for various incoming persistence operations,
/// performing those actions on disk, and returning the result in a channel.
///
/// There are two types of operations this task can perform:
/// - Writing executed blocks to disk, returning the hash of the latest block that was inserted.
/// - Removing blocks from disk, returning the removed blocks.
///
/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
/// blocking database operations in an endless loop.
#[derive(Debug)]
pub struct Persistence<DB> {
/// The db / static file provider to use
provider: ProviderFactory<DB>,
/// Incoming requests to persist stuff
incoming: Receiver<PersistenceAction>,
}
impl<DB: Database> Persistence<DB> {
/// Create a new persistence task
const fn new(provider: ProviderFactory<DB>, incoming: Receiver<PersistenceAction>) -> Self {
Self { provider, incoming }
}
/// Writes the cloned tree state to the database
fn write(&self, _blocks: Vec<ExecutedBlock>) -> ProviderResult<()> {
let mut _rw = self.provider.provider_rw()?;
todo!("implement this")
}
/// Removes the blocks above the give block number from the database, returning them.
fn remove_blocks_above(&self, _block_number: u64) -> Vec<ExecutedBlock> {
todo!("implement this")
}
}
impl<DB> Persistence<DB>
where
DB: Database + 'static,
{
/// Create a new persistence task, spawning it, and returning a [`PersistenceHandle`].
fn spawn_new(provider: ProviderFactory<DB>) -> PersistenceHandle {
let (tx, rx) = std::sync::mpsc::channel();
let task = Self::new(provider, rx);
std::thread::Builder::new()
.name("Persistence Task".to_string())
.spawn(|| task.run())
.unwrap();
PersistenceHandle::new(tx)
}
}
impl<DB> Persistence<DB>
where
DB: Database,
{
/// This is the main loop, that will listen to persistence events and perform the requested
/// database actions
fn run(self) {
// If the receiver errors then senders have disconnected, so the loop should then end.
while let Ok(action) = self.incoming.recv() {
match action {
PersistenceAction::RemoveBlocksAbove((new_tip_num, sender)) => {
// spawn blocking so we can poll the thread later
let output = self.remove_blocks_above(new_tip_num);
sender.send(output).unwrap();
}
PersistenceAction::SaveBlocks((blocks, sender)) => {
if blocks.is_empty() {
todo!("return error or something");
}
let last_block_hash = blocks.last().unwrap().block().hash();
self.write(blocks).unwrap();
sender.send(last_block_hash).unwrap();
}
}
}
}
}
/// A signal to the persistence task that part of the tree state can be persisted.
#[derive(Debug)]
pub enum PersistenceAction {
/// The section of tree state that should be persisted. These blocks are expected in order of
/// increasing block number.
SaveBlocks((Vec<ExecutedBlock>, oneshot::Sender<B256>)),
/// Removes the blocks above the given block number from the database.
RemoveBlocksAbove((u64, oneshot::Sender<Vec<ExecutedBlock>>)),
}
/// A handle to the persistence task
#[derive(Debug, Clone)]
pub struct PersistenceHandle {
/// The channel used to communicate with the persistence task
sender: Sender<PersistenceAction>,
}
impl PersistenceHandle {
/// Create a new [`PersistenceHandle`] from a [`Sender<PersistenceAction>`].
pub const fn new(sender: Sender<PersistenceAction>) -> Self {
Self { sender }
}
/// Tells the persistence task to save a certain list of finalized blocks. The blocks are
/// assumed to be ordered by block number.
///
/// This returns the latest hash that has been saved, allowing removal of that block and any
/// previous blocks from in-memory data structures.
pub async fn save_blocks(&self, blocks: Vec<ExecutedBlock>) -> B256 {
let (tx, rx) = oneshot::channel();
self.sender
.send(PersistenceAction::SaveBlocks((blocks, tx)))
.expect("should be able to send");
rx.await.expect("todo: err handling")
}
/// Tells the persistence task to remove blocks above a certain block number. The removed blocks
/// are returned by the task.
pub async fn remove_blocks_above(&self, block_num: u64) -> Vec<ExecutedBlock> {
let (tx, rx) = oneshot::channel();
self.sender
.send(PersistenceAction::RemoveBlocksAbove((block_num, tx)))
.expect("should be able to send");
rx.await.expect("todo: err handling")
}
}

View File

@ -0,0 +1,21 @@
use reth_network_p2p::test_utils::TestFullBlockClient;
use reth_primitives::{BlockBody, SealedHeader};
use std::ops::Range;
pub(crate) fn insert_headers_into_client(
client: &TestFullBlockClient,
genesis_header: SealedHeader,
range: Range<usize>,
) {
let mut sealed_header = genesis_header;
let body = BlockBody::default();
for _ in range {
let (mut header, hash) = sealed_header.split();
// update to the next header
header.parent_hash = hash;
header.number += 1;
header.timestamp += 1;
sealed_header = header.seal_slow();
client.insert(sealed_header.clone(), body.clone());
}
}

View File

@ -0,0 +1,123 @@
use super::ExecutedBlock;
use reth_errors::ProviderResult;
use reth_primitives::{Account, Address, BlockNumber, Bytecode, StorageKey, StorageValue, B256};
use reth_provider::{AccountReader, BlockHashReader, StateProvider, StateRootProvider};
use reth_trie::{updates::TrieUpdates, AccountProof};
use revm::db::BundleState;
/// A state provider that stores references to in-memory blocks along with their state as well as
/// the historical state provider for fallback lookups.
#[derive(Debug)]
pub struct MemoryOverlayStateProvider<H> {
/// The collection of executed parent blocks.
in_memory: Vec<ExecutedBlock>,
/// Historical state provider for state lookups that are not found in in-memory blocks.
historical: H,
}
impl<H> MemoryOverlayStateProvider<H> {
/// Create new memory overlay state provider.
pub const fn new(in_memory: Vec<ExecutedBlock>, historical: H) -> Self {
Self { in_memory, historical }
}
}
impl<H> BlockHashReader for MemoryOverlayStateProvider<H>
where
H: BlockHashReader,
{
fn block_hash(&self, number: BlockNumber) -> ProviderResult<Option<B256>> {
for block in self.in_memory.iter().rev() {
if block.block.number == number {
return Ok(Some(block.block.hash()))
}
}
self.historical.block_hash(number)
}
fn canonical_hashes_range(
&self,
start: BlockNumber,
end: BlockNumber,
) -> ProviderResult<Vec<B256>> {
let range = start..end;
let mut earliest_block_number = None;
let mut in_memory_hashes = Vec::new();
for block in self.in_memory.iter().rev() {
if range.contains(&block.block.number) {
in_memory_hashes.insert(0, block.block.hash());
earliest_block_number = Some(block.block.number);
}
}
let mut hashes =
self.historical.canonical_hashes_range(start, earliest_block_number.unwrap_or(end))?;
hashes.append(&mut in_memory_hashes);
Ok(hashes)
}
}
impl<H> AccountReader for MemoryOverlayStateProvider<H>
where
H: AccountReader + Send,
{
fn basic_account(&self, address: Address) -> ProviderResult<Option<Account>> {
for block in self.in_memory.iter().rev() {
if let Some(account) = block.execution_output.account(&address) {
return Ok(account)
}
}
self.historical.basic_account(address)
}
}
impl<H> StateRootProvider for MemoryOverlayStateProvider<H>
where
H: StateRootProvider + Send,
{
fn state_root(&self, bundle_state: &BundleState) -> ProviderResult<B256> {
todo!()
}
fn state_root_with_updates(
&self,
bundle_state: &BundleState,
) -> ProviderResult<(B256, TrieUpdates)> {
todo!()
}
}
impl<H> StateProvider for MemoryOverlayStateProvider<H>
where
H: StateProvider + Send,
{
fn storage(
&self,
address: Address,
storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
for block in self.in_memory.iter().rev() {
if let Some(value) = block.execution_output.storage(&address, storage_key.into()) {
return Ok(Some(value))
}
}
self.historical.storage(address, storage_key)
}
fn bytecode_by_hash(&self, code_hash: B256) -> ProviderResult<Option<Bytecode>> {
for block in self.in_memory.iter().rev() {
if let Some(contract) = block.execution_output.bytecode(&code_hash) {
return Ok(Some(contract))
}
}
self.historical.bytecode_by_hash(code_hash)
}
fn proof(&self, address: Address, keys: &[B256]) -> ProviderResult<AccountProof> {
todo!()
}
}

View File

@ -0,0 +1,574 @@
use crate::{backfill::BackfillAction, engine::DownloadRequest};
use reth_beacon_consensus::{ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated};
use reth_blockchain_tree::{
error::InsertBlockErrorKind, BlockAttachment, BlockBuffer, BlockStatus,
};
use reth_blockchain_tree_api::{error::InsertBlockError, InsertPayloadOk};
use reth_consensus::{Consensus, PostExecutionInput};
use reth_engine_primitives::EngineTypes;
use reth_errors::{ConsensusError, ProviderResult};
use reth_evm::execute::{BlockExecutorProvider, Executor};
use reth_payload_primitives::PayloadTypes;
use reth_payload_validator::ExecutionPayloadValidator;
use reth_primitives::{
Address, Block, BlockNumber, Receipts, Requests, SealedBlock, SealedBlockWithSenders, B256,
U256,
};
use reth_provider::{BlockReader, ExecutionOutcome, StateProvider, StateProviderFactory};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_types::{
engine::{
CancunPayloadFields, ForkchoiceState, PayloadStatus, PayloadStatusEnum,
PayloadValidationError,
},
ExecutionPayload,
};
use reth_trie::{updates::TrieUpdates, HashedPostState};
use std::{
collections::{BTreeMap, HashMap},
marker::PhantomData,
sync::Arc,
};
use tracing::*;
mod memory_overlay;
pub use memory_overlay::MemoryOverlayStateProvider;
/// Represents an executed block stored in-memory.
#[derive(Clone, Debug)]
pub struct ExecutedBlock {
block: Arc<SealedBlock>,
senders: Arc<Vec<Address>>,
execution_output: Arc<ExecutionOutcome>,
hashed_state: Arc<HashedPostState>,
trie: Arc<TrieUpdates>,
}
impl ExecutedBlock {
/// Returns a reference to the executed block.
pub(crate) fn block(&self) -> &SealedBlock {
&self.block
}
}
/// Keeps track of the state of the tree.
#[derive(Debug)]
pub struct TreeState {
/// All executed blocks by hash.
blocks_by_hash: HashMap<B256, ExecutedBlock>,
/// Executed blocks grouped by their respective block number.
blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlock>>,
}
impl TreeState {
fn block_by_hash(&self, hash: B256) -> Option<Arc<SealedBlock>> {
self.blocks_by_hash.get(&hash).map(|b| b.block.clone())
}
/// Insert executed block into the state.
fn insert_executed(&mut self, executed: ExecutedBlock) {
self.blocks_by_number.entry(executed.block.number).or_default().push(executed.clone());
let existing = self.blocks_by_hash.insert(executed.block.hash(), executed);
debug_assert!(existing.is_none(), "inserted duplicate block");
}
/// Remove blocks before specified block number.
pub(crate) fn remove_before(&mut self, block_number: BlockNumber) {
while self
.blocks_by_number
.first_key_value()
.map(|entry| entry.0 < &block_number)
.unwrap_or_default()
{
let (_, to_remove) = self.blocks_by_number.pop_first().unwrap();
for block in to_remove {
let block_hash = block.block.hash();
let removed = self.blocks_by_hash.remove(&block_hash);
debug_assert!(
removed.is_some(),
"attempted to remove non-existing block {block_hash}"
);
}
}
}
}
/// Tracks the state of the engine api internals.
///
/// This type is shareable.
#[derive(Debug)]
pub struct EngineApiTreeState {
/// Tracks the state of the blockchain tree.
tree_state: TreeState,
/// Tracks the received forkchoice state updates received by the CL.
forkchoice_state_tracker: ForkchoiceStateTracker,
/// Buffer of detached blocks.
buffer: BlockBuffer,
/// Tracks the header of invalid payloads that were rejected by the engine because they're
/// invalid.
invalid_headers: InvalidHeaderCache,
}
/// The type responsible for processing engine API requests.
///
/// TODO: design: should the engine handler functions also accept the response channel or return the
/// result and the caller redirects the response
pub trait EngineApiTreeHandler: Send + Sync {
/// The engine type that this handler is for.
type Engine: EngineTypes;
/// Invoked when previously requested blocks were downloaded.
fn on_downloaded(&mut self, blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent>;
/// When the Consensus layer receives a new block via the consensus gossip protocol,
/// the transactions in the block are sent to the execution layer in the form of a
/// [`ExecutionPayload`]. The Execution layer executes the transactions and validates the
/// state in the block header, then passes validation data back to Consensus layer, that
/// adds the block to the head of its own blockchain and attests to it. The block is then
/// broadcast over the consensus p2p network in the form of a "Beacon block".
///
/// These responses should adhere to the [Engine API Spec for
/// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
///
/// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
/// returns an error if an internal error occurred.
fn on_new_payload(
&mut self,
payload: ExecutionPayload,
cancun_fields: Option<CancunPayloadFields>,
) -> ProviderResult<TreeOutcome<PayloadStatus>>;
/// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
/// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
/// chain.
///
/// These responses should adhere to the [Engine API Spec for
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
///
/// Returns an error if an internal error occurred like a database error.
fn on_forkchoice_updated(
&mut self,
state: ForkchoiceState,
attrs: Option<<Self::Engine as PayloadTypes>::PayloadAttributes>,
) -> TreeOutcome<Result<OnForkChoiceUpdated, String>>;
}
/// The outcome of a tree operation.
#[derive(Debug)]
pub struct TreeOutcome<T> {
/// The outcome of the operation.
pub outcome: T,
/// An optional event to tell the caller to do something.
pub event: Option<TreeEvent>,
}
impl<T> TreeOutcome<T> {
/// Create new tree outcome.
pub const fn new(outcome: T) -> Self {
Self { outcome, event: None }
}
/// Set event on the outcome.
pub fn with_event(mut self, event: TreeEvent) -> Self {
self.event = Some(event);
self
}
}
/// Events that can be emitted by the [`EngineApiTreeHandler`].
#[derive(Debug)]
pub enum TreeEvent {
/// Tree action is needed.
TreeAction(TreeAction),
/// Backfill action is needed.
BackfillAction(BackfillAction),
/// Block download is needed.
Download(DownloadRequest),
}
/// The actions that can be performed on the tree.
#[derive(Debug)]
pub enum TreeAction {
/// Make target canonical.
MakeCanonical(B256),
}
#[derive(Debug)]
pub struct EngineApiTreeHandlerImpl<P, E, T: EngineTypes> {
provider: P,
executor_provider: E,
consensus: Arc<dyn Consensus>,
payload_validator: ExecutionPayloadValidator,
state: EngineApiTreeState,
/// (tmp) The flag indicating whether the pipeline is active.
is_pipeline_active: bool,
_marker: PhantomData<T>,
}
impl<P, E, T> EngineApiTreeHandlerImpl<P, E, T>
where
P: BlockReader + StateProviderFactory,
E: BlockExecutorProvider,
T: EngineTypes,
{
/// Return block from database or in-memory state by hash.
fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<Block>> {
// check database first
let mut block = self.provider.block_by_hash(hash)?;
if block.is_none() {
// Note: it's fine to return the unsealed block because the caller already has
// the hash
block = self
.state
.tree_state
.block_by_hash(hash)
// TODO: clone for compatibility. should we return an Arc here?
.map(|block| block.as_ref().clone().unseal());
}
Ok(block)
}
/// Return state provider with reference to in-memory blocks that overlay database state.
fn state_provider(
&self,
hash: B256,
) -> ProviderResult<MemoryOverlayStateProvider<Box<dyn StateProvider>>> {
let mut in_memory = Vec::new();
let mut parent_hash = hash;
while let Some(executed) = self.state.tree_state.blocks_by_hash.get(&parent_hash) {
parent_hash = executed.block.parent_hash;
in_memory.insert(0, executed.clone());
}
let historical = self.provider.state_by_block_hash(parent_hash)?;
Ok(MemoryOverlayStateProvider::new(in_memory, historical))
}
/// Return the parent hash of the lowest buffered ancestor for the requested block, if there
/// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
/// not exist in the buffer, this returns the hash that is passed in.
///
/// Returns the parent hash of the block itself if the block is buffered and has no other
/// buffered ancestors.
fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
self.state
.buffer
.lowest_ancestor(&hash)
.map(|block| block.parent_hash)
.unwrap_or_else(|| hash)
}
/// If validation fails, the response MUST contain the latest valid hash:
///
/// - The block hash of the ancestor of the invalid payload satisfying the following two
/// conditions:
/// - It is fully validated and deemed VALID
/// - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
/// - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
/// conditions are satisfied by a `PoW` block.
/// - null if client software cannot determine the ancestor of the invalid payload satisfying
/// the above conditions.
fn latest_valid_hash_for_invalid_payload(
&mut self,
parent_hash: B256,
) -> ProviderResult<Option<B256>> {
// Check if parent exists in side chain or in canonical chain.
if self.block_by_hash(parent_hash)?.is_some() {
return Ok(Some(parent_hash))
}
// iterate over ancestors in the invalid cache
// until we encounter the first valid ancestor
let mut current_hash = parent_hash;
let mut current_header = self.state.invalid_headers.get(&current_hash);
while let Some(header) = current_header {
current_hash = header.parent_hash;
current_header = self.state.invalid_headers.get(&current_hash);
// If current_header is None, then the current_hash does not have an invalid
// ancestor in the cache, check its presence in blockchain tree
if current_header.is_none() && self.block_by_hash(current_hash)?.is_some() {
return Ok(Some(current_hash))
}
}
Ok(None)
}
/// Prepares the invalid payload response for the given hash, checking the
/// database for the parent hash and populating the payload status with the latest valid hash
/// according to the engine api spec.
fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
// Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
// PoW block, which we need to identify by looking at the parent's block difficulty
if let Some(parent) = self.block_by_hash(parent_hash)? {
if !parent.is_zero_difficulty() {
parent_hash = B256::ZERO;
}
}
let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
})
.with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
}
/// Checks if the given `check` hash points to an invalid header, inserting the given `head`
/// block into the invalid header cache if the `check` hash has a known invalid ancestor.
///
/// Returns a payload status response according to the engine API spec if the block is known to
/// be invalid.
fn check_invalid_ancestor_with_head(
&mut self,
check: B256,
head: B256,
) -> ProviderResult<Option<PayloadStatus>> {
// check if the check hash was previously marked as invalid
let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
// populate the latest valid hash field
let status = self.prepare_invalid_response(header.parent_hash)?;
// insert the head block into the invalid header cache
self.state.invalid_headers.insert_with_invalid_ancestor(head, header);
Ok(Some(status))
}
/// Validate if block is correct and satisfies all the consensus rules that concern the header
/// and block body itself.
fn validate_block(&self, block: &SealedBlockWithSenders) -> Result<(), ConsensusError> {
if let Err(e) = self.consensus.validate_header_with_total_difficulty(block, U256::MAX) {
error!(
?block,
"Failed to validate total difficulty for block {}: {e}",
block.header.hash()
);
return Err(e)
}
if let Err(e) = self.consensus.validate_header(block) {
error!(?block, "Failed to validate header {}: {e}", block.header.hash());
return Err(e)
}
if let Err(e) = self.consensus.validate_block_pre_execution(block) {
error!(?block, "Failed to validate block {}: {e}", block.header.hash());
return Err(e)
}
Ok(())
}
fn buffer_block_without_senders(&mut self, block: SealedBlock) -> Result<(), InsertBlockError> {
match block.try_seal_with_senders() {
Ok(block) => self.buffer_block(block),
Err(block) => Err(InsertBlockError::sender_recovery_error(block)),
}
}
fn buffer_block(&mut self, block: SealedBlockWithSenders) -> Result<(), InsertBlockError> {
if let Err(err) = self.validate_block(&block) {
return Err(InsertBlockError::consensus_error(err, block.block))
}
self.state.buffer.insert_block(block);
Ok(())
}
fn insert_block_without_senders(
&mut self,
block: SealedBlock,
) -> Result<InsertPayloadOk, InsertBlockError> {
match block.try_seal_with_senders() {
Ok(block) => self.insert_block(block),
Err(block) => Err(InsertBlockError::sender_recovery_error(block)),
}
}
fn insert_block(
&mut self,
block: SealedBlockWithSenders,
) -> Result<InsertPayloadOk, InsertBlockError> {
self.insert_block_inner(block.clone())
.map_err(|kind| InsertBlockError::new(block.block, kind))
}
fn insert_block_inner(
&mut self,
block: SealedBlockWithSenders,
) -> Result<InsertPayloadOk, InsertBlockErrorKind> {
if self.block_by_hash(block.hash())?.is_some() {
let attachment = BlockAttachment::Canonical; // TODO: remove or revise attachment
return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid(attachment)))
}
// validate block consensus rules
self.validate_block(&block)?;
let state_provider = self.state_provider(block.parent_hash).unwrap();
let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
let block_number = block.number;
let block_hash = block.hash();
let block = block.unseal();
let output = executor.execute((&block, U256::MAX).into()).unwrap();
self.consensus.validate_block_post_execution(
&block,
PostExecutionInput::new(&output.receipts, &output.requests),
)?;
let hashed_state = HashedPostState::from_bundle_state(&output.state.state);
// TODO: compute and validate state root
let trie_output = TrieUpdates::default();
let executed = ExecutedBlock {
block: Arc::new(block.block.seal(block_hash)),
senders: Arc::new(block.senders),
execution_output: Arc::new(ExecutionOutcome::new(
output.state,
Receipts::from(output.receipts),
block_number,
vec![Requests::from(output.requests)],
)),
hashed_state: Arc::new(hashed_state),
trie: Arc::new(trie_output),
};
self.state.tree_state.insert_executed(executed);
let attachment = BlockAttachment::Canonical; // TODO: remove or revise attachment
Ok(InsertPayloadOk::Inserted(BlockStatus::Valid(attachment)))
}
}
impl<P, E, T> EngineApiTreeHandler for EngineApiTreeHandlerImpl<P, E, T>
where
P: BlockReader + StateProviderFactory + Clone,
E: BlockExecutorProvider,
T: EngineTypes,
{
type Engine = T;
fn on_downloaded(&mut self, _blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent> {
todo!()
}
fn on_new_payload(
&mut self,
payload: ExecutionPayload,
cancun_fields: Option<CancunPayloadFields>,
) -> ProviderResult<TreeOutcome<PayloadStatus>> {
// Ensures that the given payload does not violate any consensus rules that concern the
// block's layout, like:
// - missing or invalid base fee
// - invalid extra data
// - invalid transactions
// - incorrect hash
// - the versioned hashes passed with the payload do not exactly match transaction
// versioned hashes
// - the block does not contain blob transactions if it is pre-cancun
//
// This validates the following engine API rule:
//
// 3. Given the expected array of blob versioned hashes client software **MUST** run its
// validation by taking the following steps:
//
// 1. Obtain the actual array by concatenating blob versioned hashes lists
// (`tx.blob_versioned_hashes`) of each [blob
// transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
// in the payload, respecting the order of inclusion. If the payload has no blob
// transactions the expected array **MUST** be `[]`.
//
// 2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
// null}` if the expected and the actual arrays don't match.
//
// This validation **MUST** be instantly run in all cases even during active sync process.
let parent_hash = payload.parent_hash();
let block = match self
.payload_validator
.ensure_well_formed_payload(payload, cancun_fields.into())
{
Ok(block) => block,
Err(error) => {
error!(target: "engine::tree", %error, "Invalid payload");
// we need to convert the error to a payload status (response to the CL)
let latest_valid_hash =
if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
// Engine-API rules:
// > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
// > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
None
} else {
self.latest_valid_hash_for_invalid_payload(parent_hash)?
};
let status = PayloadStatusEnum::from(error);
return Ok(TreeOutcome::new(PayloadStatus::new(status, latest_valid_hash)))
}
};
let block_hash = block.hash();
let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
if lowest_buffered_ancestor == block_hash {
lowest_buffered_ancestor = block.parent_hash;
}
// now check the block itself
if let Some(status) =
self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block_hash)?
{
return Ok(TreeOutcome::new(status))
}
let status = if self.is_pipeline_active {
self.buffer_block_without_senders(block).unwrap();
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
} else {
let mut latest_valid_hash = None;
let status = match self.insert_block_without_senders(block).unwrap() {
InsertPayloadOk::Inserted(BlockStatus::Valid(_)) |
InsertPayloadOk::AlreadySeen(BlockStatus::Valid(_)) => {
latest_valid_hash = Some(block_hash);
PayloadStatusEnum::Valid
}
InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
// TODO: isn't this check redundant?
// check if the block's parent is already marked as invalid
// if let Some(status) = self
// .check_invalid_ancestor_with_head(block.parent_hash, block.hash())
// .map_err(|error| {
// InsertBlockError::new(block, InsertBlockErrorKind::Provider(error))
// })?
// {
// return Ok(status)
// }
// not known to be invalid, but we don't know anything else
PayloadStatusEnum::Syncing
}
};
PayloadStatus::new(status, latest_valid_hash)
};
let mut outcome = TreeOutcome::new(status);
if outcome.outcome.is_valid() {
if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
if target.head_block_hash == block_hash {
outcome = outcome
.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical(block_hash)));
}
}
}
Ok(outcome)
}
fn on_forkchoice_updated(
&mut self,
state: ForkchoiceState,
attrs: Option<<Self::Engine as PayloadTypes>::PayloadAttributes>,
) -> TreeOutcome<Result<OnForkChoiceUpdated, String>> {
todo!()
}
}