From 01979c4bdea2297fdba08acdaeca8a33c3294c1a Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 1 Jul 2024 14:03:44 +0200 Subject: [PATCH] feat: new engine API handler (#8559) Co-authored-by: Roman Krasiuk Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com> Co-authored-by: Federico Gimenez --- CODEOWNERS | 3 +- Cargo.lock | 44 ++ Cargo.toml | 2 + .../consensus/beacon/src/engine/forkchoice.rs | 9 +- .../beacon/src/engine/invalid_headers.rs | 7 +- crates/consensus/beacon/src/engine/mod.rs | 7 +- crates/engine/tree/Cargo.toml | 64 ++ crates/engine/tree/src/backfill.rs | 342 +++++++++++ crates/engine/tree/src/chain.rs | 218 +++++++ crates/engine/tree/src/download.rs | 414 +++++++++++++ crates/engine/tree/src/engine.rs | 212 +++++++ crates/engine/tree/src/lib.rs | 31 + crates/engine/tree/src/metrics.rs | 9 + crates/engine/tree/src/persistence.rs | 139 +++++ crates/engine/tree/src/test_utils.rs | 21 + crates/engine/tree/src/tree/memory_overlay.rs | 123 ++++ crates/engine/tree/src/tree/mod.rs | 574 ++++++++++++++++++ 17 files changed, 2208 insertions(+), 11 deletions(-) create mode 100644 crates/engine/tree/Cargo.toml create mode 100644 crates/engine/tree/src/backfill.rs create mode 100644 crates/engine/tree/src/chain.rs create mode 100644 crates/engine/tree/src/download.rs create mode 100644 crates/engine/tree/src/engine.rs create mode 100644 crates/engine/tree/src/lib.rs create mode 100644 crates/engine/tree/src/metrics.rs create mode 100644 crates/engine/tree/src/persistence.rs create mode 100644 crates/engine/tree/src/test_utils.rs create mode 100644 crates/engine/tree/src/tree/memory_overlay.rs create mode 100644 crates/engine/tree/src/tree/mod.rs diff --git a/CODEOWNERS b/CODEOWNERS index 11c19aa43..225d0f08b 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -6,8 +6,9 @@ crates/chainspec/ @Rjected @joshieDo @mattsse crates/cli/ @onbjerg @mattsse crates/config/ @onbjerg crates/consensus/ @rkrasiuk @mattsse @Rjected +crates/engine @rkrasiuk @mattsse @Rjected crates/e2e-test-utils/ @mattsse @Rjected -crates/engine-primitives/ @rkrasiuk @mattsse @Rjected +crates/engine/ @rkrasiuk @mattsse @Rjected @fgimenez crates/errors/ @mattsse crates/ethereum/ @mattsse @Rjected crates/ethereum-forks/ @mattsse @Rjected diff --git a/Cargo.lock b/Cargo.lock index 962e60497..0bdada0ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6966,6 +6966,50 @@ dependencies = [ "serde", ] +[[package]] +name = "reth-engine-tree" +version = "1.0.0" +dependencies = [ + "aquamarine", + "assert_matches", + "futures", + "metrics", + "parking_lot 0.12.3", + "reth-beacon-consensus", + "reth-blockchain-tree", + "reth-blockchain-tree-api", + "reth-chainspec", + "reth-consensus", + "reth-db", + "reth-db-api", + "reth-engine-primitives", + "reth-errors", + "reth-ethereum-consensus", + "reth-evm", + "reth-metrics", + "reth-network-p2p", + "reth-payload-builder", + "reth-payload-primitives", + "reth-payload-validator", + "reth-primitives", + "reth-provider", + "reth-prune", + "reth-prune-types", + "reth-revm", + "reth-rpc-types", + "reth-stages", + "reth-stages-api", + "reth-static-file", + "reth-tasks", + "reth-tokio-util", + "reth-tracing", + "reth-trie", + "revm", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "reth-engine-util" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 7110a3919..1308361d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "crates/ethereum-forks/", "crates/e2e-test-utils/", "crates/engine/primitives/", + "crates/engine/tree/", "crates/engine/util/", "crates/errors/", "crates/ethereum-forks/", @@ -287,6 +288,7 @@ reth-downloaders = { path = "crates/net/downloaders" } reth-e2e-test-utils = { path = "crates/e2e-test-utils" } reth-ecies = { path = "crates/net/ecies" } reth-engine-primitives = { path = "crates/engine/primitives" } +reth-engine-tree = { path = "crates/engine/tree" } reth-engine-util = { path = "crates/engine/util" } reth-errors = { path = "crates/errors" } reth-eth-wire = { path = "crates/net/eth-wire" } diff --git a/crates/consensus/beacon/src/engine/forkchoice.rs b/crates/consensus/beacon/src/engine/forkchoice.rs index afd19f607..491d0ff8a 100644 --- a/crates/consensus/beacon/src/engine/forkchoice.rs +++ b/crates/consensus/beacon/src/engine/forkchoice.rs @@ -3,7 +3,7 @@ use reth_rpc_types::engine::{ForkchoiceState, PayloadStatusEnum}; /// The struct that keeps track of the received forkchoice state and their status. #[derive(Debug, Clone, Default)] -pub(crate) struct ForkchoiceStateTracker { +pub struct ForkchoiceStateTracker { /// The latest forkchoice state that we received. /// /// Caution: this can be invalid. @@ -76,7 +76,7 @@ impl ForkchoiceStateTracker { } /// Returns the last received `ForkchoiceState` to which we need to sync. - pub(crate) const fn sync_target_state(&self) -> Option { + pub const fn sync_target_state(&self) -> Option { self.last_syncing } @@ -139,9 +139,12 @@ impl From for ForkchoiceStatus { /// A helper type to check represent hashes of a [`ForkchoiceState`] #[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(crate) enum ForkchoiceStateHash { +pub enum ForkchoiceStateHash { + /// Head hash of the [`ForkchoiceState`]. Head(B256), + /// Safe hash of the [`ForkchoiceState`]. Safe(B256), + /// Finalized hash of the [`ForkchoiceState`]. Finalized(B256), } diff --git a/crates/consensus/beacon/src/engine/invalid_headers.rs b/crates/consensus/beacon/src/engine/invalid_headers.rs index 2a37c6001..ebce1faf9 100644 --- a/crates/consensus/beacon/src/engine/invalid_headers.rs +++ b/crates/consensus/beacon/src/engine/invalid_headers.rs @@ -14,7 +14,8 @@ use tracing::warn; const INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128; /// Keeps track of invalid headers. -pub(crate) struct InvalidHeaderCache { +#[derive(Debug)] +pub struct InvalidHeaderCache { /// This maps a header hash to a reference to its invalid ancestor. headers: LruMap, /// Metrics for the cache. @@ -34,7 +35,7 @@ impl InvalidHeaderCache { /// /// If this is called, the hit count for the entry is incremented. /// If the hit count exceeds the threshold, the entry is evicted and `None` is returned. - pub(crate) fn get(&mut self, hash: &B256) -> Option> { + pub fn get(&mut self, hash: &B256) -> Option> { { let entry = self.headers.get(hash)?; entry.hit_count += 1; @@ -49,7 +50,7 @@ impl InvalidHeaderCache { } /// Inserts an invalid block into the cache, with a given invalid ancestor. - pub(crate) fn insert_with_invalid_ancestor( + pub fn insert_with_invalid_ancestor( &mut self, header_hash: B256, invalid_ancestor: Arc
, diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index d08dcc259..eba82c295 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -53,7 +53,7 @@ pub use error::{ }; mod invalid_headers; -use invalid_headers::InvalidHeaderCache; +pub use invalid_headers::InvalidHeaderCache; mod event; pub use event::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress}; @@ -62,13 +62,12 @@ mod handle; pub use handle::BeaconConsensusEngineHandle; mod forkchoice; -pub use forkchoice::ForkchoiceStatus; -use forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker}; +pub use forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker, ForkchoiceStatus}; mod metrics; use metrics::EngineMetrics; -pub(crate) mod sync; +pub mod sync; use sync::{EngineSyncController, EngineSyncEvent}; /// Hooks for running during the main loop of diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml new file mode 100644 index 000000000..bcc8ae34b --- /dev/null +++ b/crates/engine/tree/Cargo.toml @@ -0,0 +1,64 @@ +[package] +name = "reth-engine-tree" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +# reth +reth-beacon-consensus.workspace = true +reth-blockchain-tree.workspace = true +reth-blockchain-tree-api.workspace = true +reth-chainspec.workspace = true +reth-consensus.workspace = true +reth-db.workspace = true +reth-db-api.workspace = true +reth-engine-primitives.workspace = true +reth-errors.workspace = true +reth-ethereum-consensus.workspace = true +reth-evm.workspace = true +reth-network-p2p.workspace = true +reth-payload-builder.workspace = true +reth-payload-primitives.workspace = true +reth-payload-validator.workspace = true +reth-primitives.workspace = true +reth-provider.workspace = true +reth-prune.workspace = true +reth-revm.workspace = true +reth-rpc-types.workspace = true +reth-stages-api.workspace = true +reth-static-file.workspace = true +reth-tasks.workspace = true +reth-tokio-util.workspace = true +reth-trie.workspace = true +revm.workspace = true + +# common +futures.workspace = true +tokio = { workspace = true, features = ["macros", "sync"] } +tokio-stream = { workspace = true, features = ["sync"] } + + +# metrics +metrics.workspace = true +reth-metrics = { workspace = true, features = ["common"] } + +# misc +aquamarine.workspace = true +parking_lot.workspace = true +tracing.workspace = true + +[dev-dependencies] +# reth +reth-network-p2p = { workspace = true, features = ["test-utils"] } +reth-prune-types.workspace = true +reth-stages = { workspace = true, features = ["test-utils"] } +reth-tracing.workspace = true + +assert_matches.workspace = true \ No newline at end of file diff --git a/crates/engine/tree/src/backfill.rs b/crates/engine/tree/src/backfill.rs new file mode 100644 index 000000000..ec0b4ef06 --- /dev/null +++ b/crates/engine/tree/src/backfill.rs @@ -0,0 +1,342 @@ +//! It is expected that the node has two sync modes: +//! +//! - Backfill sync: Sync to a certain block height in stages, e.g. download data from p2p then +//! execute that range. +//! - Live sync: In this mode the nodes is keeping up with the latest tip and listens for new +//! requests from the consensus client. +//! +//! These modes are mutually exclusive and the node can only be in one mode at a time. + +use futures::FutureExt; +use reth_db_api::database::Database; +use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult}; +use reth_tasks::TaskSpawner; +use std::task::{ready, Context, Poll}; +use tokio::sync::oneshot; +use tracing::trace; + +/// Backfill sync mode functionality. +pub trait BackfillSync: Send + Sync { + /// Performs a backfill action. + fn on_action(&mut self, action: BackfillAction); + + /// Polls the pipeline for completion. + fn poll(&mut self, cx: &mut Context<'_>) -> Poll; +} + +/// The backfill actions that can be performed. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum BackfillAction { + /// Start backfilling with the given target. + Start(PipelineTarget), +} + +/// The events that can be emitted on backfill sync. +#[derive(Debug)] +pub enum BackfillEvent { + /// Backfill sync idle. + Idle, + /// Backfill sync started. + Started(PipelineTarget), + /// Backfill sync finished. + /// + /// If this is returned, backfill sync is idle. + Finished(Result), + /// Sync task was dropped after it was started, unable to receive it because + /// channel closed. This would indicate a panicked task. + TaskDropped(String), +} + +/// Pipeline sync. +#[derive(Debug)] +pub struct PipelineSync +where + DB: Database, +{ + /// The type that can spawn the pipeline task. + pipeline_task_spawner: Box, + /// The current state of the pipeline. + /// The pipeline is used for large ranges. + pipeline_state: PipelineState, + /// Pending target block for the pipeline to sync + pending_pipeline_target: Option, +} + +impl PipelineSync +where + DB: Database + 'static, +{ + /// Create a new instance. + pub fn new(pipeline: Pipeline, pipeline_task_spawner: Box) -> Self { + Self { + pipeline_task_spawner, + pipeline_state: PipelineState::Idle(Some(pipeline)), + pending_pipeline_target: None, + } + } + + /// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`. + #[allow(dead_code)] + const fn is_pipeline_sync_pending(&self) -> bool { + self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle() + } + + /// Returns `true` if the pipeline is idle. + const fn is_pipeline_idle(&self) -> bool { + self.pipeline_state.is_idle() + } + + /// Returns `true` if the pipeline is active. + const fn is_pipeline_active(&self) -> bool { + !self.is_pipeline_idle() + } + + /// Sets a new target to sync the pipeline to. + /// + /// But ensures the target is not the zero hash. + fn set_pipeline_sync_target(&mut self, target: PipelineTarget) { + if target.sync_target().is_some_and(|target| target.is_zero()) { + trace!( + target: "consensus::engine::sync", + "Pipeline target cannot be zero hash." + ); + // precaution to never sync to the zero hash + return + } + self.pending_pipeline_target = Some(target); + } + + /// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to + /// run continuously. + fn try_spawn_pipeline(&mut self) -> Option { + match &mut self.pipeline_state { + PipelineState::Idle(pipeline) => { + let target = self.pending_pipeline_target.take()?; + let (tx, rx) = oneshot::channel(); + + let pipeline = pipeline.take().expect("exists"); + self.pipeline_task_spawner.spawn_critical_blocking( + "pipeline task", + Box::pin(async move { + let result = pipeline.run_as_fut(Some(target)).await; + let _ = tx.send(result); + }), + ); + self.pipeline_state = PipelineState::Running(rx); + + Some(BackfillEvent::Started(target)) + } + PipelineState::Running(_) => None, + } + } + + /// Advances the pipeline state. + /// + /// This checks for the result in the channel, or returns pending if the pipeline is idle. + fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll { + let res = match self.pipeline_state { + PipelineState::Idle(_) => return Poll::Pending, + PipelineState::Running(ref mut fut) => { + ready!(fut.poll_unpin(cx)) + } + }; + let ev = match res { + Ok((_, result)) => BackfillEvent::Finished(result), + Err(why) => { + // failed to receive the pipeline + BackfillEvent::TaskDropped(why.to_string()) + } + }; + Poll::Ready(ev) + } +} + +impl BackfillSync for PipelineSync +where + DB: Database + 'static, +{ + fn on_action(&mut self, event: BackfillAction) { + match event { + BackfillAction::Start(target) => self.set_pipeline_sync_target(target), + } + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + // try to spawn a pipeline if a target is set + if let Some(event) = self.try_spawn_pipeline() { + return Poll::Ready(event) + } + + // make sure we poll the pipeline if it's active, and return any ready pipeline events + if !self.is_pipeline_idle() { + // advance the pipeline + if let Poll::Ready(event) = self.poll_pipeline(cx) { + return Poll::Ready(event) + } + } + + Poll::Pending + } +} + +/// The possible pipeline states within the sync controller. +/// +/// [`PipelineState::Idle`] means that the pipeline is currently idle. +/// [`PipelineState::Running`] means that the pipeline is currently running. +/// +/// NOTE: The differentiation between these two states is important, because when the pipeline is +/// running, it acquires the write lock over the database. This means that we cannot forward to the +/// blockchain tree any messages that would result in database writes, since it would result in a +/// deadlock. +#[derive(Debug)] +enum PipelineState { + /// Pipeline is idle. + Idle(Option>), + /// Pipeline is running and waiting for a response + Running(oneshot::Receiver>), +} + +impl PipelineState { + /// Returns `true` if the state matches idle. + const fn is_idle(&self) -> bool { + matches!(self, Self::Idle(_)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::insert_headers_into_client; + use assert_matches::assert_matches; + use futures::poll; + use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET}; + use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase}; + use reth_network_p2p::test_utils::TestFullBlockClient; + use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, BlockNumber, Header, B256}; + use reth_provider::{ + test_utils::create_test_provider_factory_with_chain_spec, ExecutionOutcome, + }; + use reth_prune_types::PruneModes; + use reth_stages::{test_utils::TestStages, ExecOutput, StageError}; + use reth_stages_api::StageCheckpoint; + use reth_static_file::StaticFileProducer; + use reth_tasks::TokioTaskExecutor; + use std::{collections::VecDeque, future::poll_fn, sync::Arc}; + use tokio::sync::watch; + + struct TestHarness { + pipeline_sync: PipelineSync>>, + tip: B256, + } + + impl TestHarness { + fn new(total_blocks: usize, pipeline_done_after: u64) -> Self { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + + // force the pipeline to be "done" after 5 blocks + let pipeline = TestPipelineBuilder::new() + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(BlockNumber::from(pipeline_done_after)), + done: true, + })])) + .build(chain_spec); + + let pipeline_sync = PipelineSync::new(pipeline, Box::::default()); + let client = TestFullBlockClient::default(); + let header = Header { + base_fee_per_gas: Some(7), + gas_limit: ETHEREUM_BLOCK_GAS_LIMIT, + ..Default::default() + } + .seal_slow(); + insert_headers_into_client(&client, header, 0..total_blocks); + + let tip = client.highest_block().expect("there should be blocks here").hash(); + + Self { pipeline_sync, tip } + } + } + + struct TestPipelineBuilder { + pipeline_exec_outputs: VecDeque>, + executor_results: Vec, + } + + impl TestPipelineBuilder { + /// Create a new [`TestPipelineBuilder`]. + const fn new() -> Self { + Self { pipeline_exec_outputs: VecDeque::new(), executor_results: Vec::new() } + } + + /// Set the pipeline execution outputs to use for the test consensus engine. + fn with_pipeline_exec_outputs( + mut self, + pipeline_exec_outputs: VecDeque>, + ) -> Self { + self.pipeline_exec_outputs = pipeline_exec_outputs; + self + } + + /// Set the executor results to use for the test consensus engine. + #[allow(dead_code)] + fn with_executor_results(mut self, executor_results: Vec) -> Self { + self.executor_results = executor_results; + self + } + + /// Builds the pipeline. + fn build(self, chain_spec: Arc) -> Pipeline>> { + reth_tracing::init_test_tracing(); + + // Setup pipeline + let (tip_tx, _tip_rx) = watch::channel(B256::default()); + let pipeline = Pipeline::builder() + .add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default())) + .with_tip_sender(tip_tx); + + let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec); + + let static_file_producer = + StaticFileProducer::new(provider_factory.clone(), PruneModes::default()); + + pipeline.build(provider_factory, static_file_producer) + } + } + + #[tokio::test] + async fn pipeline_started_and_finished() { + const TOTAL_BLOCKS: usize = 10; + const PIPELINE_DONE_AFTER: u64 = 5; + let TestHarness { mut pipeline_sync, tip } = + TestHarness::new(TOTAL_BLOCKS, PIPELINE_DONE_AFTER); + + let sync_future = poll_fn(|cx| pipeline_sync.poll(cx)); + let next_event = poll!(sync_future); + + // sync target not set, pipeline not started + assert_matches!(next_event, Poll::Pending); + + pipeline_sync.on_action(BackfillAction::Start(PipelineTarget::Sync(tip))); + + let sync_future = poll_fn(|cx| pipeline_sync.poll(cx)); + let next_event = poll!(sync_future); + + // sync target set, pipeline started + assert_matches!(next_event, Poll::Ready(BackfillEvent::Started(target)) => { + assert_eq!(target.sync_target().unwrap(), tip); + }); + + // the next event should be the pipeline finishing in a good state + let sync_future = poll_fn(|cx| pipeline_sync.poll(cx)); + let next_ready = sync_future.await; + assert_matches!(next_ready, BackfillEvent::Finished(result) => { + assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: PIPELINE_DONE_AFTER })); + }); + } +} diff --git a/crates/engine/tree/src/chain.rs b/crates/engine/tree/src/chain.rs new file mode 100644 index 000000000..97c6d615c --- /dev/null +++ b/crates/engine/tree/src/chain.rs @@ -0,0 +1,218 @@ +use crate::backfill::{BackfillAction, BackfillEvent, BackfillSync}; +use futures::Stream; +use reth_stages_api::PipelineTarget; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +/// The type that drives the chain forward. +/// +/// A state machine that orchestrates the components responsible for advancing the chain +/// +/// +/// ## Control flow +/// +/// The [`ChainOrchestrator`] is responsible for controlling the pipeline sync and additional hooks. +/// It polls the given `handler`, which is responsible for advancing the chain, how is up to the +/// handler. However, due to database restrictions (e.g. exclusive write access), following +/// invariants apply: +/// - If the handler requests a backfill run (e.g. [`BackfillAction::Start`]), the handler must +/// ensure that while the pipeline is running, no other write access is granted. +/// - At any time the [`ChainOrchestrator`] can request exclusive write access to the database +/// (e.g. if pruning is required), but will not do so until the handler has acknowledged the +/// request for write access. +/// +/// The [`ChainOrchestrator`] polls the [`ChainHandler`] to advance the chain and handles the +/// emitted events. Requests and events are passed to the [`ChainHandler`] via +/// [`ChainHandler::on_event`]. +#[must_use = "Stream does nothing unless polled"] +#[derive(Debug)] +pub struct ChainOrchestrator +where + T: ChainHandler, + P: BackfillSync, +{ + /// The handler for advancing the chain. + handler: T, + /// Controls pipeline sync. + pipeline: P, +} + +impl ChainOrchestrator +where + T: ChainHandler + Unpin, + P: BackfillSync + Unpin, +{ + /// Creates a new [`ChainOrchestrator`] with the given handler and pipeline. + pub const fn new(handler: T, pipeline: P) -> Self { + Self { handler, pipeline } + } + + /// Returns the handler + pub const fn handler(&self) -> &T { + &self.handler + } + + /// Returns a mutable reference to the handler + pub fn handler_mut(&mut self) -> &mut T { + &mut self.handler + } + + /// Internal function used to advance the chain. + /// + /// Polls the `ChainOrchestrator` for the next event. + #[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))] + fn poll_next_event(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + // This loop polls the components + // + // 1. Polls the pipeline to completion, if active. + // 2. Advances the chain by polling the handler. + 'outer: loop { + // try to poll the pipeline to completion, if active + match this.pipeline.poll(cx) { + Poll::Ready(pipeline_event) => match pipeline_event { + BackfillEvent::Idle => {} + BackfillEvent::Started(_) => { + // notify handler that pipeline started + this.handler.on_event(FromOrchestrator::PipelineStarted); + return Poll::Ready(ChainEvent::PipelineStarted); + } + BackfillEvent::Finished(res) => { + return match res { + Ok(event) => { + tracing::debug!(?event, "pipeline finished"); + // notify handler that pipeline finished + this.handler.on_event(FromOrchestrator::PipelineFinished); + Poll::Ready(ChainEvent::PipelineFinished) + } + Err(err) => { + tracing::error!( %err, "pipeline failed"); + Poll::Ready(ChainEvent::FatalError) + } + } + } + BackfillEvent::TaskDropped(err) => { + tracing::error!( %err, "pipeline task dropped"); + return Poll::Ready(ChainEvent::FatalError); + } + }, + Poll::Pending => {} + } + + // poll the handler for the next event + match this.handler.poll(cx) { + Poll::Ready(handler_event) => { + match handler_event { + HandlerEvent::Pipeline(target) => { + // trigger pipeline and start polling it + this.pipeline.on_action(BackfillAction::Start(target)); + continue 'outer + } + HandlerEvent::Event(ev) => { + // bubble up the event + return Poll::Ready(ChainEvent::Handler(ev)); + } + } + } + Poll::Pending => { + // no more events to process + break 'outer + } + } + } + + Poll::Pending + } +} + +impl Stream for ChainOrchestrator +where + T: ChainHandler + Unpin, + P: BackfillSync + Unpin, +{ + type Item = ChainEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.as_mut().poll_next_event(cx).map(Some) + } +} + +/// Represents the sync mode the chain is operating in. +#[derive(Debug, Default)] +enum SyncMode { + #[default] + Handler, + Backfill, +} + +/// Event emitted by the [`ChainOrchestrator`] +/// +/// These are meant to be used for observability and debugging purposes. +#[derive(Debug)] +pub enum ChainEvent { + /// Pipeline sync started + PipelineStarted, + /// Pipeline sync finished + PipelineFinished, + /// Fatal error + FatalError, + /// Event emitted by the handler + Handler(T), +} + +/// A trait that advances the chain by handling actions. +/// +/// This is intended to be implement the chain consensus logic, for example `engine` API. +pub trait ChainHandler: Send + Sync { + /// Event generated by this handler that orchestrator can bubble up; + type Event: Send; + + /// Informs the handler about an event from the [`ChainOrchestrator`]. + fn on_event(&mut self, event: FromOrchestrator); + + /// Polls for actions that [`ChainOrchestrator`] should handle. + fn poll(&mut self, cx: &mut Context<'_>) -> Poll>; +} + +/// Events/Requests that the [`ChainHandler`] can emit to the [`ChainOrchestrator`]. +#[derive(Clone, Debug)] +pub enum HandlerEvent { + /// Request to start a pipeline sync + Pipeline(PipelineTarget), + /// Other event emitted by the handler + Event(T), +} + +/// Internal events issued by the [`ChainOrchestrator`]. +#[derive(Clone, Debug)] +pub enum FromOrchestrator { + /// Invoked when pipeline sync finished + PipelineFinished, + /// Invoked when pipeline started + PipelineStarted, +} + +/// Represents the state of the chain. +#[derive(Clone, Copy, PartialEq, Eq, Default, Debug)] +pub enum OrchestratorState { + /// Orchestrator has exclusive write access to the database. + PipelineActive, + /// Node is actively processing the chain. + #[default] + Idle, +} + +impl OrchestratorState { + /// Returns `true` if the state is [`OrchestratorState::PipelineActive`]. + pub const fn is_pipeline_active(&self) -> bool { + matches!(self, Self::PipelineActive) + } + + /// Returns `true` if the state is [`OrchestratorState::Idle`]. + pub const fn is_idle(&self) -> bool { + matches!(self, Self::Idle) + } +} diff --git a/crates/engine/tree/src/download.rs b/crates/engine/tree/src/download.rs new file mode 100644 index 000000000..12b2bd189 --- /dev/null +++ b/crates/engine/tree/src/download.rs @@ -0,0 +1,414 @@ +//! Handler that can download blocks on demand (e.g. from the network). + +use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics}; +use futures::FutureExt; +use reth_consensus::Consensus; +use reth_network_p2p::{ + bodies::client::BodiesClient, + full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient}, + headers::client::HeadersClient, +}; +use reth_primitives::{SealedBlock, SealedBlockWithSenders, B256}; +use std::{ + cmp::{Ordering, Reverse}, + collections::{binary_heap::PeekMut, BinaryHeap, HashSet}, + sync::Arc, + task::{Context, Poll}, +}; +use tracing::trace; + +/// A trait that can download blocks on demand. +pub trait BlockDownloader: Send + Sync { + /// Handle an action. + fn on_action(&mut self, event: DownloadAction); + + /// Advance in progress requests if any + fn poll(&mut self, cx: &mut Context<'_>) -> Poll; +} + +/// Actions that can be performed by the block downloader. +#[derive(Debug)] +pub enum DownloadAction { + /// Stop downloading blocks. + Clear, + /// Download given blocks + Download(DownloadRequest), +} + +/// Outcome of downloaded blocks. +#[derive(Debug)] +pub enum DownloadOutcome { + /// Downloaded blocks. + Blocks(Vec), +} + +/// Basic [`BlockDownloader`]. +pub struct BasicBlockDownloader +where + Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, +{ + /// A downloader that can download full blocks from the network. + full_block_client: FullBlockClient, + /// In-flight full block requests in progress. + inflight_full_block_requests: Vec>, + /// In-flight full block _range_ requests in progress. + inflight_block_range_requests: Vec>, + /// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for + /// ordering. This means the blocks will be popped from the heap with ascending block numbers. + set_buffered_blocks: BinaryHeap>, + /// Engine download metrics. + metrics: BlockDownloaderMetrics, +} + +impl BasicBlockDownloader +where + Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, +{ + /// Create a new instance + pub(crate) fn new(client: Client, consensus: Arc) -> Self { + Self { + full_block_client: FullBlockClient::new(client, consensus), + inflight_full_block_requests: Vec::new(), + inflight_block_range_requests: Vec::new(), + set_buffered_blocks: BinaryHeap::new(), + metrics: BlockDownloaderMetrics::default(), + } + } + + /// Clears the stored inflight requests. + fn clear(&mut self) { + self.inflight_full_block_requests.clear(); + self.inflight_block_range_requests.clear(); + self.set_buffered_blocks.clear(); + self.update_block_download_metrics(); + } + + /// Processes a download request. + fn download(&mut self, request: DownloadRequest) { + match request { + DownloadRequest::BlockSet(hashes) => self.download_block_set(hashes), + DownloadRequest::BlockRange(hash, count) => self.download_block_range(hash, count), + } + } + + /// Processes a block set download request. + fn download_block_set(&mut self, hashes: HashSet) { + for hash in hashes { + self.download_full_block(hash); + } + } + + /// Processes a block range download request. + fn download_block_range(&mut self, hash: B256, count: u64) { + if count == 1 { + self.download_full_block(hash); + } else { + trace!( + target: "consensus::engine", + ?hash, + ?count, + "start downloading full block range." + ); + + let request = self.full_block_client.get_full_block_range(hash, count); + self.inflight_block_range_requests.push(request); + } + } + + /// Starts requesting a full block from the network. + /// + /// Returns `true` if the request was started, `false` if there's already a request for the + /// given hash. + fn download_full_block(&mut self, hash: B256) -> bool { + if self.is_inflight_request(hash) { + return false + } + trace!( + target: "consensus::engine::sync", + ?hash, + "Start downloading full block" + ); + + let request = self.full_block_client.get_full_block(hash); + self.inflight_full_block_requests.push(request); + + self.update_block_download_metrics(); + + true + } + + /// Returns true if there's already a request for the given hash. + fn is_inflight_request(&self, hash: B256) -> bool { + self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash) + } + + /// Sets the metrics for the active downloads + fn update_block_download_metrics(&self) { + self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64); + // TODO: full block range metrics + } +} + +impl BlockDownloader for BasicBlockDownloader +where + Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, +{ + /// Handles incoming download actions. + fn on_action(&mut self, event: DownloadAction) { + match event { + DownloadAction::Clear => self.clear(), + DownloadAction::Download(request) => self.download(request), + } + } + + /// Advances the download process. + fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + // advance all full block requests + for idx in (0..self.inflight_full_block_requests.len()).rev() { + let mut request = self.inflight_full_block_requests.swap_remove(idx); + if let Poll::Ready(block) = request.poll_unpin(cx) { + trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering"); + self.set_buffered_blocks.push(Reverse(block.into())); + } else { + // still pending + self.inflight_full_block_requests.push(request); + } + } + + // advance all full block range requests + for idx in (0..self.inflight_block_range_requests.len()).rev() { + let mut request = self.inflight_block_range_requests.swap_remove(idx); + if let Poll::Ready(blocks) = request.poll_unpin(cx) { + trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering"); + self.set_buffered_blocks.extend( + blocks + .into_iter() + .map(|b| { + let senders = b.senders().unwrap_or_default(); + OrderedSealedBlockWithSenders(SealedBlockWithSenders { + block: b, + senders, + }) + }) + .map(Reverse), + ); + } else { + // still pending + self.inflight_block_range_requests.push(request); + } + } + + self.update_block_download_metrics(); + + if self.set_buffered_blocks.is_empty() { + return Poll::Pending; + } + + // drain all unique element of the block buffer if there are any + let mut downloaded_blocks: Vec = + Vec::with_capacity(self.set_buffered_blocks.len()); + while let Some(block) = self.set_buffered_blocks.pop() { + // peek ahead and pop duplicates + while let Some(peek) = self.set_buffered_blocks.peek_mut() { + if peek.0 .0.hash() == block.0 .0.hash() { + PeekMut::pop(peek); + } else { + break + } + } + downloaded_blocks.push(block.0.into()); + } + Poll::Ready(DownloadOutcome::Blocks(downloaded_blocks)) + } +} + +/// A wrapper type around [`SealedBlockWithSenders`] that implements the [Ord] +/// trait by block number. +#[derive(Debug, Clone, PartialEq, Eq)] +struct OrderedSealedBlockWithSenders(SealedBlockWithSenders); + +impl PartialOrd for OrderedSealedBlockWithSenders { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for OrderedSealedBlockWithSenders { + fn cmp(&self, other: &Self) -> Ordering { + self.0.number.cmp(&other.0.number) + } +} + +impl From for OrderedSealedBlockWithSenders { + fn from(block: SealedBlock) -> Self { + let senders = block.senders().unwrap_or_default(); + Self(SealedBlockWithSenders { block, senders }) + } +} + +impl From for SealedBlockWithSenders { + fn from(value: OrderedSealedBlockWithSenders) -> Self { + let senders = value.0.senders; + Self { block: value.0.block, senders } + } +} + +/// A [`BlockDownloader`] that does nothing. +#[derive(Debug, Clone, Default)] +#[non_exhaustive] +pub struct NoopBlockDownloader; + +impl BlockDownloader for NoopBlockDownloader { + fn on_action(&mut self, _event: DownloadAction) {} + + fn poll(&mut self, _cx: &mut Context<'_>) -> Poll { + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::insert_headers_into_client; + use assert_matches::assert_matches; + use reth_beacon_consensus::EthBeaconConsensus; + use reth_chainspec::{ChainSpecBuilder, MAINNET}; + use reth_network_p2p::test_utils::TestFullBlockClient; + use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, Header}; + use std::{future::poll_fn, sync::Arc}; + + struct TestHarness { + block_downloader: BasicBlockDownloader, + client: TestFullBlockClient, + } + + impl TestHarness { + fn new(total_blocks: usize) -> Self { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + + let client = TestFullBlockClient::default(); + let header = Header { + base_fee_per_gas: Some(7), + gas_limit: ETHEREUM_BLOCK_GAS_LIMIT, + ..Default::default() + } + .seal_slow(); + + insert_headers_into_client(&client, header, 0..total_blocks); + let consensus = Arc::new(EthBeaconConsensus::new(chain_spec)); + + let block_downloader = BasicBlockDownloader::new(client.clone(), consensus); + Self { block_downloader, client } + } + } + + #[tokio::test] + async fn block_downloader_range_request() { + const TOTAL_BLOCKS: usize = 10; + let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS); + let tip = client.highest_block().expect("there should be blocks here"); + + // send block range download request + block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange( + tip.hash(), + tip.number, + ))); + + // ensure we have one in flight range request + assert_eq!(block_downloader.inflight_block_range_requests.len(), 1); + + // ensure the range request is made correctly + let first_req = block_downloader.inflight_block_range_requests.first().unwrap(); + assert_eq!(first_req.start_hash(), tip.hash()); + assert_eq!(first_req.count(), tip.number); + + // poll downloader + let sync_future = poll_fn(|cx| block_downloader.poll(cx)); + let next_ready = sync_future.await; + + assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => { + // ensure all blocks were obtained + assert_eq!(blocks.len(), TOTAL_BLOCKS); + + // ensure they are in ascending order + for num in 1..=TOTAL_BLOCKS { + assert_eq!(blocks[num-1].number, num as u64); + } + }); + } + + #[tokio::test] + async fn block_downloader_set_request() { + const TOTAL_BLOCKS: usize = 2; + let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS); + + let tip = client.highest_block().expect("there should be blocks here"); + + // send block set download request + block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockSet( + HashSet::from([tip.hash(), tip.parent_hash]), + ))); + + // ensure we have TOTAL_BLOCKS in flight full block request + assert_eq!(block_downloader.inflight_full_block_requests.len(), TOTAL_BLOCKS); + + // poll downloader + let sync_future = poll_fn(|cx| block_downloader.poll(cx)); + let next_ready = sync_future.await; + + assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => { + // ensure all blocks were obtained + assert_eq!(blocks.len(), TOTAL_BLOCKS); + + // ensure they are in ascending order + for num in 1..=TOTAL_BLOCKS { + assert_eq!(blocks[num-1].number, num as u64); + } + }); + } + + #[tokio::test] + async fn block_downloader_clear_request() { + const TOTAL_BLOCKS: usize = 10; + let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS); + + let tip = client.highest_block().expect("there should be blocks here"); + + // send block range download request + block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange( + tip.hash(), + tip.number, + ))); + + // send block set download request + let download_set = HashSet::from([tip.hash(), tip.parent_hash]); + block_downloader + .on_action(DownloadAction::Download(DownloadRequest::BlockSet(download_set.clone()))); + + // ensure we have one in flight range request + assert_eq!(block_downloader.inflight_block_range_requests.len(), 1); + + // ensure the range request is made correctly + let first_req = block_downloader.inflight_block_range_requests.first().unwrap(); + assert_eq!(first_req.start_hash(), tip.hash()); + assert_eq!(first_req.count(), tip.number); + + // ensure we have download_set.len() in flight full block request + assert_eq!(block_downloader.inflight_full_block_requests.len(), download_set.len()); + + // send clear request + block_downloader.on_action(DownloadAction::Clear); + + // ensure we have no in flight range request + assert_eq!(block_downloader.inflight_block_range_requests.len(), 0); + + // ensure we have no in flight full block request + assert_eq!(block_downloader.inflight_full_block_requests.len(), 0); + } +} diff --git a/crates/engine/tree/src/engine.rs b/crates/engine/tree/src/engine.rs new file mode 100644 index 000000000..d77162e4c --- /dev/null +++ b/crates/engine/tree/src/engine.rs @@ -0,0 +1,212 @@ +//! An engine API handler for the chain. + +use crate::{ + chain::{ChainHandler, FromOrchestrator, HandlerEvent}, + download::{BlockDownloader, DownloadAction, DownloadOutcome}, +}; +use futures::{Stream, StreamExt}; +use reth_beacon_consensus::BeaconEngineMessage; +use reth_engine_primitives::EngineTypes; +use reth_primitives::{SealedBlockWithSenders, B256}; +use std::{ + collections::HashSet, + task::{Context, Poll}, +}; +use tokio::sync::mpsc; + +/// Advances the chain based on incoming requests. +/// +/// This is a general purpose request handler with network access. +/// This type listens for incoming messages and processes them via the configured request handler. +/// +/// ## Overview +/// +/// This type is an orchestrator for incoming messages and responsible for delegating requests +/// received from the CL to the handler. +/// +/// It is responsible for handling the following: +/// - Downloading blocks on demand from the network if requested by the [`EngineApiRequestHandler`]. +/// +/// The core logic is part of the [`EngineRequestHandler`], which is responsible for processing the +/// incoming requests. +#[derive(Debug)] +pub struct EngineHandler { + /// Processes requests. + /// + /// This type is responsible for processing incoming requests. + handler: T, + /// Receiver for incoming requests that need to be processed. + incoming_requests: S, + /// A downloader to download blocks on demand. + downloader: D, +} + +impl EngineHandler { + /// Creates a new [`EngineHandler`] with the given handler and downloader. + pub const fn new(handler: T, downloader: D, incoming_requests: S) -> Self + where + T: EngineRequestHandler, + { + Self { handler, incoming_requests, downloader } + } +} + +impl ChainHandler for EngineHandler +where + T: EngineRequestHandler, + S: Stream + Send + Sync + Unpin + 'static, + D: BlockDownloader, +{ + type Event = T::Event; + + fn on_event(&mut self, event: FromOrchestrator) { + // delegate event to the handler + self.handler.on_event(event.into()); + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + // drain the handler first + while let Poll::Ready(ev) = self.handler.poll(cx) { + match ev { + RequestHandlerEvent::Idle => break, + RequestHandlerEvent::HandlerEvent(ev) => { + return match ev { + HandlerEvent::Pipeline(target) => { + // bubble up pipeline request + self.downloader.on_action(DownloadAction::Clear); + Poll::Ready(HandlerEvent::Pipeline(target)) + } + HandlerEvent::Event(ev) => { + // bubble up the event + Poll::Ready(HandlerEvent::Event(ev)) + } + } + } + RequestHandlerEvent::Download(req) => { + // delegate download request to the downloader + self.downloader.on_action(DownloadAction::Download(req)); + } + } + } + + // pop the next incoming request + if let Poll::Ready(Some(req)) = self.incoming_requests.poll_next_unpin(cx) { + // and delegate the request to the handler + self.handler.on_event(FromEngine::Request(req)); + // skip downloading in this iteration to allow the handler to process the request + continue + } + + // advance the downloader + if let Poll::Ready(DownloadOutcome::Blocks(blocks)) = self.downloader.poll(cx) { + // delegate the downloaded blocks to the handler + self.handler.on_event(FromEngine::DownloadedBlocks(blocks)); + continue + } + + return Poll::Pending + } + } +} + +/// A type that processes incoming requests (e.g. requests from the consensus layer, engine API) +pub trait EngineRequestHandler: Send + Sync { + /// Even type this handler can emit + type Event: Send; + /// The request type this handler can process. + type Request; + + /// Informs the handler about an event from the [`EngineHandler`]. + fn on_event(&mut self, event: FromEngine); + + /// Advances the handler. + fn poll(&mut self, cx: &mut Context<'_>) -> Poll>; +} + +/// An [`EngineRequestHandler`] that processes engine API requests by delegating to an execution +/// task. +/// +/// This type is responsible for advancing the chain during live sync (following the tip of the +/// chain). +/// +/// It advances the chain based on received engine API requests by delegating them to the tree +/// executor. +/// +/// There are two types of requests that can be processed: +/// +/// - `on_new_payload`: Executes the payload and inserts it into the tree. These are allowed to be +/// processed concurrently. +/// - `on_forkchoice_updated`: Updates the fork choice based on the new head. These require write +/// access to the database and are skipped if the handler can't acquire exclusive access to the +/// database. +/// +/// In case required blocks are missing, the handler will request them from the network, by emitting +/// a download request upstream. +#[derive(Debug)] +pub struct EngineApiRequestHandler { + /// channel to send messages to the tree to execute the payload. + to_tree: std::sync::mpsc::Sender>>, + /// channel to receive messages from the tree. + from_tree: mpsc::UnboundedReceiver, + // TODO add db controller +} + +impl EngineApiRequestHandler where T: EngineTypes {} + +impl EngineRequestHandler for EngineApiRequestHandler +where + T: EngineTypes, +{ + type Event = EngineApiEvent; + type Request = BeaconEngineMessage; + + fn on_event(&mut self, event: FromEngine) { + // delegate to the tree + let _ = self.to_tree.send(event); + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { + todo!("poll tree and handle db") + } +} + +/// Events emitted by the engine API handler. +#[derive(Debug)] +pub enum EngineApiEvent {} + +#[derive(Debug)] +pub enum FromEngine { + /// Event from the top level orchestrator. + Event(FromOrchestrator), + /// Request from the engine + Request(Req), + /// Downloaded blocks from the network. + DownloadedBlocks(Vec), +} + +impl From for FromEngine { + fn from(event: FromOrchestrator) -> Self { + Self::Event(event) + } +} + +/// Requests produced by a [`EngineRequestHandler`]. +#[derive(Debug)] +pub enum RequestHandlerEvent { + /// The handler is idle. + Idle, + /// An event emitted by the handler. + HandlerEvent(HandlerEvent), + /// Request to download blocks. + Download(DownloadRequest), +} + +/// A request to download blocks from the network. +#[derive(Debug)] +pub enum DownloadRequest { + /// Download the given set of blocks. + BlockSet(HashSet), + /// Download the given range of blocks. + BlockRange(B256, u64), +} diff --git a/crates/engine/tree/src/lib.rs b/crates/engine/tree/src/lib.rs new file mode 100644 index 000000000..52ef90e4b --- /dev/null +++ b/crates/engine/tree/src/lib.rs @@ -0,0 +1,31 @@ +//! This crate includes the core components for advancing a reth chain. + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", + html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", + issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" +)] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +// #![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![allow(missing_docs, dead_code, missing_debug_implementations, unused_variables)] // TODO rm + +/// Re-export of the blockchain tree API. +pub use reth_blockchain_tree_api::*; + +/// Support for backfill sync mode. +pub mod backfill; +/// The type that drives the chain forward. +pub mod chain; +/// Support for downloading blocks on demand for live sync. +pub mod download; +/// Engine Api chain handler support. +pub mod engine; +/// Metrics support. +pub mod metrics; +/// The background writer task for batch db writes. +pub mod persistence; +/// Support for interacting with the blockchain tree. +pub mod tree; + +#[cfg(test)] +mod test_utils; diff --git a/crates/engine/tree/src/metrics.rs b/crates/engine/tree/src/metrics.rs new file mode 100644 index 000000000..9579affe6 --- /dev/null +++ b/crates/engine/tree/src/metrics.rs @@ -0,0 +1,9 @@ +use reth_metrics::{metrics::Gauge, Metrics}; + +/// Metrics for the `BasicBlockDownloader`. +#[derive(Metrics)] +#[metrics(scope = "consensus.engine.beacon")] +pub(crate) struct BlockDownloaderMetrics { + /// How many blocks are currently being downloaded. + pub(crate) active_block_downloads: Gauge, +} diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs new file mode 100644 index 000000000..e7bfadcc1 --- /dev/null +++ b/crates/engine/tree/src/persistence.rs @@ -0,0 +1,139 @@ +#![allow(dead_code)] + +use crate::tree::ExecutedBlock; +use reth_db::database::Database; +use reth_errors::ProviderResult; +use reth_primitives::B256; +use reth_provider::ProviderFactory; +use std::sync::mpsc::{Receiver, Sender}; +use tokio::sync::oneshot; + +/// Writes parts of reth's in memory tree state to the database. +/// +/// This is meant to be a spawned task that listens for various incoming persistence operations, +/// performing those actions on disk, and returning the result in a channel. +/// +/// There are two types of operations this task can perform: +/// - Writing executed blocks to disk, returning the hash of the latest block that was inserted. +/// - Removing blocks from disk, returning the removed blocks. +/// +/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs +/// blocking database operations in an endless loop. +#[derive(Debug)] +pub struct Persistence { + /// The db / static file provider to use + provider: ProviderFactory, + /// Incoming requests to persist stuff + incoming: Receiver, +} + +impl Persistence { + /// Create a new persistence task + const fn new(provider: ProviderFactory, incoming: Receiver) -> Self { + Self { provider, incoming } + } + + /// Writes the cloned tree state to the database + fn write(&self, _blocks: Vec) -> ProviderResult<()> { + let mut _rw = self.provider.provider_rw()?; + todo!("implement this") + } + + /// Removes the blocks above the give block number from the database, returning them. + fn remove_blocks_above(&self, _block_number: u64) -> Vec { + todo!("implement this") + } +} + +impl Persistence +where + DB: Database + 'static, +{ + /// Create a new persistence task, spawning it, and returning a [`PersistenceHandle`]. + fn spawn_new(provider: ProviderFactory) -> PersistenceHandle { + let (tx, rx) = std::sync::mpsc::channel(); + let task = Self::new(provider, rx); + std::thread::Builder::new() + .name("Persistence Task".to_string()) + .spawn(|| task.run()) + .unwrap(); + + PersistenceHandle::new(tx) + } +} + +impl Persistence +where + DB: Database, +{ + /// This is the main loop, that will listen to persistence events and perform the requested + /// database actions + fn run(self) { + // If the receiver errors then senders have disconnected, so the loop should then end. + while let Ok(action) = self.incoming.recv() { + match action { + PersistenceAction::RemoveBlocksAbove((new_tip_num, sender)) => { + // spawn blocking so we can poll the thread later + let output = self.remove_blocks_above(new_tip_num); + sender.send(output).unwrap(); + } + PersistenceAction::SaveBlocks((blocks, sender)) => { + if blocks.is_empty() { + todo!("return error or something"); + } + let last_block_hash = blocks.last().unwrap().block().hash(); + self.write(blocks).unwrap(); + sender.send(last_block_hash).unwrap(); + } + } + } + } +} + +/// A signal to the persistence task that part of the tree state can be persisted. +#[derive(Debug)] +pub enum PersistenceAction { + /// The section of tree state that should be persisted. These blocks are expected in order of + /// increasing block number. + SaveBlocks((Vec, oneshot::Sender)), + + /// Removes the blocks above the given block number from the database. + RemoveBlocksAbove((u64, oneshot::Sender>)), +} + +/// A handle to the persistence task +#[derive(Debug, Clone)] +pub struct PersistenceHandle { + /// The channel used to communicate with the persistence task + sender: Sender, +} + +impl PersistenceHandle { + /// Create a new [`PersistenceHandle`] from a [`Sender`]. + pub const fn new(sender: Sender) -> Self { + Self { sender } + } + + /// Tells the persistence task to save a certain list of finalized blocks. The blocks are + /// assumed to be ordered by block number. + /// + /// This returns the latest hash that has been saved, allowing removal of that block and any + /// previous blocks from in-memory data structures. + pub async fn save_blocks(&self, blocks: Vec) -> B256 { + let (tx, rx) = oneshot::channel(); + self.sender + .send(PersistenceAction::SaveBlocks((blocks, tx))) + .expect("should be able to send"); + rx.await.expect("todo: err handling") + } + + /// Tells the persistence task to remove blocks above a certain block number. The removed blocks + /// are returned by the task. + pub async fn remove_blocks_above(&self, block_num: u64) -> Vec { + let (tx, rx) = oneshot::channel(); + self.sender + .send(PersistenceAction::RemoveBlocksAbove((block_num, tx))) + .expect("should be able to send"); + rx.await.expect("todo: err handling") + } +} diff --git a/crates/engine/tree/src/test_utils.rs b/crates/engine/tree/src/test_utils.rs new file mode 100644 index 000000000..eed483e29 --- /dev/null +++ b/crates/engine/tree/src/test_utils.rs @@ -0,0 +1,21 @@ +use reth_network_p2p::test_utils::TestFullBlockClient; +use reth_primitives::{BlockBody, SealedHeader}; +use std::ops::Range; + +pub(crate) fn insert_headers_into_client( + client: &TestFullBlockClient, + genesis_header: SealedHeader, + range: Range, +) { + let mut sealed_header = genesis_header; + let body = BlockBody::default(); + for _ in range { + let (mut header, hash) = sealed_header.split(); + // update to the next header + header.parent_hash = hash; + header.number += 1; + header.timestamp += 1; + sealed_header = header.seal_slow(); + client.insert(sealed_header.clone(), body.clone()); + } +} diff --git a/crates/engine/tree/src/tree/memory_overlay.rs b/crates/engine/tree/src/tree/memory_overlay.rs new file mode 100644 index 000000000..7e0e1d52e --- /dev/null +++ b/crates/engine/tree/src/tree/memory_overlay.rs @@ -0,0 +1,123 @@ +use super::ExecutedBlock; +use reth_errors::ProviderResult; +use reth_primitives::{Account, Address, BlockNumber, Bytecode, StorageKey, StorageValue, B256}; +use reth_provider::{AccountReader, BlockHashReader, StateProvider, StateRootProvider}; +use reth_trie::{updates::TrieUpdates, AccountProof}; +use revm::db::BundleState; + +/// A state provider that stores references to in-memory blocks along with their state as well as +/// the historical state provider for fallback lookups. +#[derive(Debug)] +pub struct MemoryOverlayStateProvider { + /// The collection of executed parent blocks. + in_memory: Vec, + /// Historical state provider for state lookups that are not found in in-memory blocks. + historical: H, +} + +impl MemoryOverlayStateProvider { + /// Create new memory overlay state provider. + pub const fn new(in_memory: Vec, historical: H) -> Self { + Self { in_memory, historical } + } +} + +impl BlockHashReader for MemoryOverlayStateProvider +where + H: BlockHashReader, +{ + fn block_hash(&self, number: BlockNumber) -> ProviderResult> { + for block in self.in_memory.iter().rev() { + if block.block.number == number { + return Ok(Some(block.block.hash())) + } + } + + self.historical.block_hash(number) + } + + fn canonical_hashes_range( + &self, + start: BlockNumber, + end: BlockNumber, + ) -> ProviderResult> { + let range = start..end; + let mut earliest_block_number = None; + let mut in_memory_hashes = Vec::new(); + for block in self.in_memory.iter().rev() { + if range.contains(&block.block.number) { + in_memory_hashes.insert(0, block.block.hash()); + earliest_block_number = Some(block.block.number); + } + } + + let mut hashes = + self.historical.canonical_hashes_range(start, earliest_block_number.unwrap_or(end))?; + hashes.append(&mut in_memory_hashes); + Ok(hashes) + } +} + +impl AccountReader for MemoryOverlayStateProvider +where + H: AccountReader + Send, +{ + fn basic_account(&self, address: Address) -> ProviderResult> { + for block in self.in_memory.iter().rev() { + if let Some(account) = block.execution_output.account(&address) { + return Ok(account) + } + } + + self.historical.basic_account(address) + } +} + +impl StateRootProvider for MemoryOverlayStateProvider +where + H: StateRootProvider + Send, +{ + fn state_root(&self, bundle_state: &BundleState) -> ProviderResult { + todo!() + } + + fn state_root_with_updates( + &self, + bundle_state: &BundleState, + ) -> ProviderResult<(B256, TrieUpdates)> { + todo!() + } +} + +impl StateProvider for MemoryOverlayStateProvider +where + H: StateProvider + Send, +{ + fn storage( + &self, + address: Address, + storage_key: StorageKey, + ) -> ProviderResult> { + for block in self.in_memory.iter().rev() { + if let Some(value) = block.execution_output.storage(&address, storage_key.into()) { + return Ok(Some(value)) + } + } + + self.historical.storage(address, storage_key) + } + + fn bytecode_by_hash(&self, code_hash: B256) -> ProviderResult> { + for block in self.in_memory.iter().rev() { + if let Some(contract) = block.execution_output.bytecode(&code_hash) { + return Ok(Some(contract)) + } + } + + self.historical.bytecode_by_hash(code_hash) + } + + fn proof(&self, address: Address, keys: &[B256]) -> ProviderResult { + todo!() + } +} diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs new file mode 100644 index 000000000..e4cf33923 --- /dev/null +++ b/crates/engine/tree/src/tree/mod.rs @@ -0,0 +1,574 @@ +use crate::{backfill::BackfillAction, engine::DownloadRequest}; +use reth_beacon_consensus::{ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated}; +use reth_blockchain_tree::{ + error::InsertBlockErrorKind, BlockAttachment, BlockBuffer, BlockStatus, +}; +use reth_blockchain_tree_api::{error::InsertBlockError, InsertPayloadOk}; +use reth_consensus::{Consensus, PostExecutionInput}; +use reth_engine_primitives::EngineTypes; +use reth_errors::{ConsensusError, ProviderResult}; +use reth_evm::execute::{BlockExecutorProvider, Executor}; +use reth_payload_primitives::PayloadTypes; +use reth_payload_validator::ExecutionPayloadValidator; +use reth_primitives::{ + Address, Block, BlockNumber, Receipts, Requests, SealedBlock, SealedBlockWithSenders, B256, + U256, +}; +use reth_provider::{BlockReader, ExecutionOutcome, StateProvider, StateProviderFactory}; +use reth_revm::database::StateProviderDatabase; +use reth_rpc_types::{ + engine::{ + CancunPayloadFields, ForkchoiceState, PayloadStatus, PayloadStatusEnum, + PayloadValidationError, + }, + ExecutionPayload, +}; +use reth_trie::{updates::TrieUpdates, HashedPostState}; +use std::{ + collections::{BTreeMap, HashMap}, + marker::PhantomData, + sync::Arc, +}; +use tracing::*; + +mod memory_overlay; +pub use memory_overlay::MemoryOverlayStateProvider; + +/// Represents an executed block stored in-memory. +#[derive(Clone, Debug)] +pub struct ExecutedBlock { + block: Arc, + senders: Arc>, + execution_output: Arc, + hashed_state: Arc, + trie: Arc, +} + +impl ExecutedBlock { + /// Returns a reference to the executed block. + pub(crate) fn block(&self) -> &SealedBlock { + &self.block + } +} + +/// Keeps track of the state of the tree. +#[derive(Debug)] +pub struct TreeState { + /// All executed blocks by hash. + blocks_by_hash: HashMap, + /// Executed blocks grouped by their respective block number. + blocks_by_number: BTreeMap>, +} + +impl TreeState { + fn block_by_hash(&self, hash: B256) -> Option> { + self.blocks_by_hash.get(&hash).map(|b| b.block.clone()) + } + + /// Insert executed block into the state. + fn insert_executed(&mut self, executed: ExecutedBlock) { + self.blocks_by_number.entry(executed.block.number).or_default().push(executed.clone()); + let existing = self.blocks_by_hash.insert(executed.block.hash(), executed); + debug_assert!(existing.is_none(), "inserted duplicate block"); + } + + /// Remove blocks before specified block number. + pub(crate) fn remove_before(&mut self, block_number: BlockNumber) { + while self + .blocks_by_number + .first_key_value() + .map(|entry| entry.0 < &block_number) + .unwrap_or_default() + { + let (_, to_remove) = self.blocks_by_number.pop_first().unwrap(); + for block in to_remove { + let block_hash = block.block.hash(); + let removed = self.blocks_by_hash.remove(&block_hash); + debug_assert!( + removed.is_some(), + "attempted to remove non-existing block {block_hash}" + ); + } + } + } +} + +/// Tracks the state of the engine api internals. +/// +/// This type is shareable. +#[derive(Debug)] +pub struct EngineApiTreeState { + /// Tracks the state of the blockchain tree. + tree_state: TreeState, + /// Tracks the received forkchoice state updates received by the CL. + forkchoice_state_tracker: ForkchoiceStateTracker, + /// Buffer of detached blocks. + buffer: BlockBuffer, + /// Tracks the header of invalid payloads that were rejected by the engine because they're + /// invalid. + invalid_headers: InvalidHeaderCache, +} + +/// The type responsible for processing engine API requests. +/// +/// TODO: design: should the engine handler functions also accept the response channel or return the +/// result and the caller redirects the response +pub trait EngineApiTreeHandler: Send + Sync { + /// The engine type that this handler is for. + type Engine: EngineTypes; + + /// Invoked when previously requested blocks were downloaded. + fn on_downloaded(&mut self, blocks: Vec) -> Option; + + /// When the Consensus layer receives a new block via the consensus gossip protocol, + /// the transactions in the block are sent to the execution layer in the form of a + /// [`ExecutionPayload`]. The Execution layer executes the transactions and validates the + /// state in the block header, then passes validation data back to Consensus layer, that + /// adds the block to the head of its own blockchain and attests to it. The block is then + /// broadcast over the consensus p2p network in the form of a "Beacon block". + /// + /// These responses should adhere to the [Engine API Spec for + /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification). + /// + /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and + /// returns an error if an internal error occurred. + fn on_new_payload( + &mut self, + payload: ExecutionPayload, + cancun_fields: Option, + ) -> ProviderResult>; + + /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree + /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid + /// chain. + /// + /// These responses should adhere to the [Engine API Spec for + /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1). + /// + /// Returns an error if an internal error occurred like a database error. + fn on_forkchoice_updated( + &mut self, + state: ForkchoiceState, + attrs: Option<::PayloadAttributes>, + ) -> TreeOutcome>; +} + +/// The outcome of a tree operation. +#[derive(Debug)] +pub struct TreeOutcome { + /// The outcome of the operation. + pub outcome: T, + /// An optional event to tell the caller to do something. + pub event: Option, +} + +impl TreeOutcome { + /// Create new tree outcome. + pub const fn new(outcome: T) -> Self { + Self { outcome, event: None } + } + + /// Set event on the outcome. + pub fn with_event(mut self, event: TreeEvent) -> Self { + self.event = Some(event); + self + } +} + +/// Events that can be emitted by the [`EngineApiTreeHandler`]. +#[derive(Debug)] +pub enum TreeEvent { + /// Tree action is needed. + TreeAction(TreeAction), + /// Backfill action is needed. + BackfillAction(BackfillAction), + /// Block download is needed. + Download(DownloadRequest), +} + +/// The actions that can be performed on the tree. +#[derive(Debug)] +pub enum TreeAction { + /// Make target canonical. + MakeCanonical(B256), +} + +#[derive(Debug)] +pub struct EngineApiTreeHandlerImpl { + provider: P, + executor_provider: E, + consensus: Arc, + payload_validator: ExecutionPayloadValidator, + state: EngineApiTreeState, + /// (tmp) The flag indicating whether the pipeline is active. + is_pipeline_active: bool, + _marker: PhantomData, +} + +impl EngineApiTreeHandlerImpl +where + P: BlockReader + StateProviderFactory, + E: BlockExecutorProvider, + T: EngineTypes, +{ + /// Return block from database or in-memory state by hash. + fn block_by_hash(&self, hash: B256) -> ProviderResult> { + // check database first + let mut block = self.provider.block_by_hash(hash)?; + if block.is_none() { + // Note: it's fine to return the unsealed block because the caller already has + // the hash + block = self + .state + .tree_state + .block_by_hash(hash) + // TODO: clone for compatibility. should we return an Arc here? + .map(|block| block.as_ref().clone().unseal()); + } + Ok(block) + } + + /// Return state provider with reference to in-memory blocks that overlay database state. + fn state_provider( + &self, + hash: B256, + ) -> ProviderResult>> { + let mut in_memory = Vec::new(); + let mut parent_hash = hash; + while let Some(executed) = self.state.tree_state.blocks_by_hash.get(&parent_hash) { + parent_hash = executed.block.parent_hash; + in_memory.insert(0, executed.clone()); + } + + let historical = self.provider.state_by_block_hash(parent_hash)?; + Ok(MemoryOverlayStateProvider::new(in_memory, historical)) + } + + /// Return the parent hash of the lowest buffered ancestor for the requested block, if there + /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does + /// not exist in the buffer, this returns the hash that is passed in. + /// + /// Returns the parent hash of the block itself if the block is buffered and has no other + /// buffered ancestors. + fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 { + self.state + .buffer + .lowest_ancestor(&hash) + .map(|block| block.parent_hash) + .unwrap_or_else(|| hash) + } + + /// If validation fails, the response MUST contain the latest valid hash: + /// + /// - The block hash of the ancestor of the invalid payload satisfying the following two + /// conditions: + /// - It is fully validated and deemed VALID + /// - Any other ancestor of the invalid payload with a higher blockNumber is INVALID + /// - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above + /// conditions are satisfied by a `PoW` block. + /// - null if client software cannot determine the ancestor of the invalid payload satisfying + /// the above conditions. + fn latest_valid_hash_for_invalid_payload( + &mut self, + parent_hash: B256, + ) -> ProviderResult> { + // Check if parent exists in side chain or in canonical chain. + if self.block_by_hash(parent_hash)?.is_some() { + return Ok(Some(parent_hash)) + } + + // iterate over ancestors in the invalid cache + // until we encounter the first valid ancestor + let mut current_hash = parent_hash; + let mut current_header = self.state.invalid_headers.get(¤t_hash); + while let Some(header) = current_header { + current_hash = header.parent_hash; + current_header = self.state.invalid_headers.get(¤t_hash); + + // If current_header is None, then the current_hash does not have an invalid + // ancestor in the cache, check its presence in blockchain tree + if current_header.is_none() && self.block_by_hash(current_hash)?.is_some() { + return Ok(Some(current_hash)) + } + } + Ok(None) + } + + /// Prepares the invalid payload response for the given hash, checking the + /// database for the parent hash and populating the payload status with the latest valid hash + /// according to the engine api spec. + fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult { + // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal + // PoW block, which we need to identify by looking at the parent's block difficulty + if let Some(parent) = self.block_by_hash(parent_hash)? { + if !parent.is_zero_difficulty() { + parent_hash = B256::ZERO; + } + } + + let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?; + Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid { + validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(), + }) + .with_latest_valid_hash(valid_parent_hash.unwrap_or_default())) + } + + /// Checks if the given `check` hash points to an invalid header, inserting the given `head` + /// block into the invalid header cache if the `check` hash has a known invalid ancestor. + /// + /// Returns a payload status response according to the engine API spec if the block is known to + /// be invalid. + fn check_invalid_ancestor_with_head( + &mut self, + check: B256, + head: B256, + ) -> ProviderResult> { + // check if the check hash was previously marked as invalid + let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) }; + + // populate the latest valid hash field + let status = self.prepare_invalid_response(header.parent_hash)?; + + // insert the head block into the invalid header cache + self.state.invalid_headers.insert_with_invalid_ancestor(head, header); + + Ok(Some(status)) + } + + /// Validate if block is correct and satisfies all the consensus rules that concern the header + /// and block body itself. + fn validate_block(&self, block: &SealedBlockWithSenders) -> Result<(), ConsensusError> { + if let Err(e) = self.consensus.validate_header_with_total_difficulty(block, U256::MAX) { + error!( + ?block, + "Failed to validate total difficulty for block {}: {e}", + block.header.hash() + ); + return Err(e) + } + + if let Err(e) = self.consensus.validate_header(block) { + error!(?block, "Failed to validate header {}: {e}", block.header.hash()); + return Err(e) + } + + if let Err(e) = self.consensus.validate_block_pre_execution(block) { + error!(?block, "Failed to validate block {}: {e}", block.header.hash()); + return Err(e) + } + + Ok(()) + } + + fn buffer_block_without_senders(&mut self, block: SealedBlock) -> Result<(), InsertBlockError> { + match block.try_seal_with_senders() { + Ok(block) => self.buffer_block(block), + Err(block) => Err(InsertBlockError::sender_recovery_error(block)), + } + } + + fn buffer_block(&mut self, block: SealedBlockWithSenders) -> Result<(), InsertBlockError> { + if let Err(err) = self.validate_block(&block) { + return Err(InsertBlockError::consensus_error(err, block.block)) + } + self.state.buffer.insert_block(block); + Ok(()) + } + + fn insert_block_without_senders( + &mut self, + block: SealedBlock, + ) -> Result { + match block.try_seal_with_senders() { + Ok(block) => self.insert_block(block), + Err(block) => Err(InsertBlockError::sender_recovery_error(block)), + } + } + + fn insert_block( + &mut self, + block: SealedBlockWithSenders, + ) -> Result { + self.insert_block_inner(block.clone()) + .map_err(|kind| InsertBlockError::new(block.block, kind)) + } + + fn insert_block_inner( + &mut self, + block: SealedBlockWithSenders, + ) -> Result { + if self.block_by_hash(block.hash())?.is_some() { + let attachment = BlockAttachment::Canonical; // TODO: remove or revise attachment + return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid(attachment))) + } + + // validate block consensus rules + self.validate_block(&block)?; + + let state_provider = self.state_provider(block.parent_hash).unwrap(); + let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider)); + + let block_number = block.number; + let block_hash = block.hash(); + let block = block.unseal(); + let output = executor.execute((&block, U256::MAX).into()).unwrap(); + self.consensus.validate_block_post_execution( + &block, + PostExecutionInput::new(&output.receipts, &output.requests), + )?; + + let hashed_state = HashedPostState::from_bundle_state(&output.state.state); + + // TODO: compute and validate state root + let trie_output = TrieUpdates::default(); + + let executed = ExecutedBlock { + block: Arc::new(block.block.seal(block_hash)), + senders: Arc::new(block.senders), + execution_output: Arc::new(ExecutionOutcome::new( + output.state, + Receipts::from(output.receipts), + block_number, + vec![Requests::from(output.requests)], + )), + hashed_state: Arc::new(hashed_state), + trie: Arc::new(trie_output), + }; + self.state.tree_state.insert_executed(executed); + + let attachment = BlockAttachment::Canonical; // TODO: remove or revise attachment + Ok(InsertPayloadOk::Inserted(BlockStatus::Valid(attachment))) + } +} + +impl EngineApiTreeHandler for EngineApiTreeHandlerImpl +where + P: BlockReader + StateProviderFactory + Clone, + E: BlockExecutorProvider, + T: EngineTypes, +{ + type Engine = T; + + fn on_downloaded(&mut self, _blocks: Vec) -> Option { + todo!() + } + + fn on_new_payload( + &mut self, + payload: ExecutionPayload, + cancun_fields: Option, + ) -> ProviderResult> { + // Ensures that the given payload does not violate any consensus rules that concern the + // block's layout, like: + // - missing or invalid base fee + // - invalid extra data + // - invalid transactions + // - incorrect hash + // - the versioned hashes passed with the payload do not exactly match transaction + // versioned hashes + // - the block does not contain blob transactions if it is pre-cancun + // + // This validates the following engine API rule: + // + // 3. Given the expected array of blob versioned hashes client software **MUST** run its + // validation by taking the following steps: + // + // 1. Obtain the actual array by concatenating blob versioned hashes lists + // (`tx.blob_versioned_hashes`) of each [blob + // transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included + // in the payload, respecting the order of inclusion. If the payload has no blob + // transactions the expected array **MUST** be `[]`. + // + // 2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage | + // null}` if the expected and the actual arrays don't match. + // + // This validation **MUST** be instantly run in all cases even during active sync process. + let parent_hash = payload.parent_hash(); + let block = match self + .payload_validator + .ensure_well_formed_payload(payload, cancun_fields.into()) + { + Ok(block) => block, + Err(error) => { + error!(target: "engine::tree", %error, "Invalid payload"); + // we need to convert the error to a payload status (response to the CL) + + let latest_valid_hash = + if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() { + // Engine-API rules: + // > `latestValidHash: null` if the blockHash validation has failed () + // > `latestValidHash: null` if the expected and the actual arrays don't match () + None + } else { + self.latest_valid_hash_for_invalid_payload(parent_hash)? + }; + + let status = PayloadStatusEnum::from(error); + return Ok(TreeOutcome::new(PayloadStatus::new(status, latest_valid_hash))) + } + }; + + let block_hash = block.hash(); + let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash); + if lowest_buffered_ancestor == block_hash { + lowest_buffered_ancestor = block.parent_hash; + } + + // now check the block itself + if let Some(status) = + self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block_hash)? + { + return Ok(TreeOutcome::new(status)) + } + + let status = if self.is_pipeline_active { + self.buffer_block_without_senders(block).unwrap(); + PayloadStatus::from_status(PayloadStatusEnum::Syncing) + } else { + let mut latest_valid_hash = None; + let status = match self.insert_block_without_senders(block).unwrap() { + InsertPayloadOk::Inserted(BlockStatus::Valid(_)) | + InsertPayloadOk::AlreadySeen(BlockStatus::Valid(_)) => { + latest_valid_hash = Some(block_hash); + PayloadStatusEnum::Valid + } + InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) | + InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => { + // TODO: isn't this check redundant? + // check if the block's parent is already marked as invalid + // if let Some(status) = self + // .check_invalid_ancestor_with_head(block.parent_hash, block.hash()) + // .map_err(|error| { + // InsertBlockError::new(block, InsertBlockErrorKind::Provider(error)) + // })? + // { + // return Ok(status) + // } + + // not known to be invalid, but we don't know anything else + PayloadStatusEnum::Syncing + } + }; + PayloadStatus::new(status, latest_valid_hash) + }; + + let mut outcome = TreeOutcome::new(status); + if outcome.outcome.is_valid() { + if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() { + if target.head_block_hash == block_hash { + outcome = outcome + .with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical(block_hash))); + } + } + } + Ok(outcome) + } + + fn on_forkchoice_updated( + &mut self, + state: ForkchoiceState, + attrs: Option<::PayloadAttributes>, + ) -> TreeOutcome> { + todo!() + } +}