From ce40bea46e56b01896ae45b7577ff4e562b0cc58 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Thu, 23 Mar 2023 20:18:19 +0200 Subject: [PATCH] feat(sync): beacon consensus engine (#1845) --- Cargo.lock | 15 +- bin/reth/src/args/rpc_server_args.rs | 4 + bin/reth/src/node/mod.rs | 95 +- crates/consensus/beacon/Cargo.toml | 22 + crates/consensus/beacon/src/engine/error.rs | 23 + crates/consensus/beacon/src/engine/message.rs | 30 + crates/consensus/beacon/src/engine/mod.rs | 992 ++++++++++++++++++ .../beacon/src/engine/pipeline_state.rs | 26 + crates/consensus/beacon/src/lib.rs | 3 + crates/executor/src/blockchain_tree/mod.rs | 76 +- crates/interfaces/src/executor.rs | 6 +- crates/rpc/rpc-builder/Cargo.toml | 2 +- crates/rpc/rpc-builder/src/auth.rs | 18 +- crates/rpc/rpc-engine-api/Cargo.toml | 2 +- crates/rpc/rpc-engine-api/src/engine_api.rs | 829 ++------------- crates/rpc/rpc-engine-api/src/error.rs | 50 +- crates/rpc/rpc-engine-api/src/message.rs | 18 +- crates/rpc/rpc-types/Cargo.toml | 7 +- crates/rpc/rpc-types/src/eth/engine.rs | 235 ----- .../rpc-types/src/eth/engine/forkchoice.rs | 39 + crates/rpc/rpc-types/src/eth/engine/mod.rs | 22 + .../rpc/rpc-types/src/eth/engine/payload.rs | 361 +++++++ .../rpc-types/src/eth/engine/transition.rs | 14 + .../rpc/rpc/src/{engine/mod.rs => engine.rs} | 122 ++- crates/stages/src/pipeline/ctrl.rs | 4 +- crates/stages/src/pipeline/mod.rs | 42 +- .../storage/provider/src/test_utils/blocks.rs | 2 +- 27 files changed, 1918 insertions(+), 1141 deletions(-) create mode 100644 crates/consensus/beacon/src/engine/error.rs create mode 100644 crates/consensus/beacon/src/engine/message.rs create mode 100644 crates/consensus/beacon/src/engine/mod.rs create mode 100644 crates/consensus/beacon/src/engine/pipeline_state.rs delete mode 100644 crates/rpc/rpc-types/src/eth/engine.rs create mode 100644 crates/rpc/rpc-types/src/eth/engine/forkchoice.rs create mode 100644 crates/rpc/rpc-types/src/eth/engine/mod.rs create mode 100644 crates/rpc/rpc-types/src/eth/engine/payload.rs create mode 100644 crates/rpc/rpc-types/src/eth/engine/transition.rs rename crates/rpc/rpc/src/{engine/mod.rs => engine.rs} (64%) diff --git a/Cargo.lock b/Cargo.lock index 6ca5f9d40..9c0382902 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4483,9 +4483,21 @@ dependencies = [ name = "reth-beacon-consensus" version = "0.1.0" dependencies = [ + "assert_matches", + "futures", "reth-consensus-common", + "reth-db", + "reth-executor", "reth-interfaces", "reth-primitives", + "reth-provider", + "reth-rpc-types", + "reth-stages", + "reth-tracing", + "thiserror", + "tokio", + "tokio-stream", + "tracing", ] [[package]] @@ -5101,12 +5113,12 @@ version = "0.1.0" dependencies = [ "assert_matches", "futures", + "reth-beacon-consensus", "reth-executor", "reth-interfaces", "reth-primitives", "reth-provider", "reth-revm", - "reth-rlp", "reth-rpc-types", "thiserror", "tokio", @@ -5118,6 +5130,7 @@ dependencies = [ name = "reth-rpc-types" version = "0.1.0" dependencies = [ + "assert_matches", "jsonrpsee-types", "lru 0.9.0", "rand 0.8.5", diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 9bef1c268..a431603b6 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -5,6 +5,7 @@ use clap::Args; use jsonrpsee::{core::Error as RpcError, server::ServerHandle}; use reth_interfaces::events::ChainEventSubscriptions; use reth_network_api::{NetworkInfo, Peers}; +use reth_primitives::ChainSpec; use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory}; use reth_rpc::{JwtError, JwtSecret}; use reth_rpc_builder::{ @@ -17,6 +18,7 @@ use reth_transaction_pool::TransactionPool; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, path::Path, + sync::Arc, }; /// Parameters for configuring the rpc more granularity via CLI @@ -149,6 +151,7 @@ impl RpcServerArgs { pool: Pool, network: Network, executor: Tasks, + chain_spec: Arc, handle: EngineApiHandle, ) -> Result where @@ -173,6 +176,7 @@ impl RpcServerArgs { pool, network, executor, + chain_spec, handle, socket_address, secret, diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 63b2270dc..3324529b4 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -13,7 +13,7 @@ use events::NodeEvent; use eyre::Context; use fdlimit::raise_fd_limit; use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt}; -use reth_beacon_consensus::BeaconConsensus; +use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage}; use reth_db::{ database::Database, mdbx::{Env, WriteMap}, @@ -25,6 +25,10 @@ use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; +use reth_executor::{ + blockchain_tree::{config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree}, + Factory, +}; use reth_interfaces::{ consensus::{Consensus, ForkchoiceState}, p2p::{ @@ -61,7 +65,10 @@ use std::{ path::PathBuf, sync::Arc, }; -use tokio::sync::{mpsc::unbounded_channel, watch}; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot, watch, +}; use tracing::*; pub mod events; @@ -219,9 +226,37 @@ impl Command { info!(target: "reth::cli", "Continuous sync mode enabled"); } - // TODO: This will be fixed with the sync controller (https://github.com/paradigmxyz/reth/pull/1662) - let (tx, _rx) = watch::channel(ForkchoiceState::default()); - let engine_api_handle = self.init_engine_api(Arc::clone(&db), tx, &ctx.task_executor); + let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); + + // Forward the `debug.tip` as forkchoice state to the consensus engine. + // This will initiate the sync up to the provided tip. + let _tip_rx = match self.tip { + Some(tip) => { + let (tip_tx, tip_rx) = oneshot::channel(); + let state = ForkchoiceState { + head_block_hash: tip, + finalized_block_hash: tip, + safe_block_hash: tip, + }; + consensus_engine_tx.send(BeaconEngineMessage::ForkchoiceUpdated { + state, + payload_attrs: None, + tx: tip_tx, + })?; + debug!(target: "reth::cli", %tip, "Tip manually set"); + Some(tip_rx) + } + None => { + let warn_msg = "No tip specified. \ + reth cannot communicate with consensus clients, \ + so a tip must manually be provided for the online stages with --debug.tip ."; + warn!(target: "reth::cli", warn_msg); + None + } + }; + + let engine_api_handle = + self.init_engine_api(Arc::clone(&db), consensus_engine_tx, &ctx.task_executor); info!(target: "reth::cli", "Engine API handler initialized"); let _auth_server = self @@ -231,12 +266,13 @@ impl Command { transaction_pool, network.clone(), ctx.task_executor.clone(), + self.chain.clone(), engine_api_handle, ) .await?; info!(target: "reth::cli", "Started Auth server"); - let (mut pipeline, events) = self + let (pipeline, events) = self .build_networked_pipeline( &mut config, network.clone(), @@ -246,29 +282,22 @@ impl Command { ) .await?; - if let Some(tip) = self.tip { - pipeline.set_tip(tip); - debug!(target: "reth::cli", %tip, "Tip manually set"); - } else { - let warn_msg = "No tip specified. \ - reth cannot communicate with consensus clients, \ - so a tip must manually be provided for the online stages with --debug.tip ."; - warn!(target: "reth::cli", warn_msg); - } - ctx.task_executor.spawn(events::handle_events(Some(network.clone()), events)); - // Run pipeline + let beacon_consensus_engine = + self.build_consensus_engine(db.clone(), consensus, pipeline, consensus_engine_rx)?; + + // Run consensus engine let (rx, tx) = tokio::sync::oneshot::channel(); - info!(target: "reth::cli", "Starting sync pipeline"); - ctx.task_executor.spawn_critical_blocking("pipeline task", async move { - let res = pipeline.run(db.clone()).await; + info!(target: "reth::cli", "Starting consensus engine"); + ctx.task_executor.spawn_critical_blocking("consensus engine", async move { + let res = beacon_consensus_engine.await; let _ = rx.send(res); }); tx.await??; - info!(target: "reth::cli", "Pipeline has finished."); + info!(target: "reth::cli", "Consensus engine has exited."); if self.terminate { Ok(()) @@ -327,6 +356,26 @@ impl Command { Ok((pipeline, events)) } + fn build_consensus_engine( + &self, + db: Arc, + consensus: C, + pipeline: Pipeline, + message_rx: UnboundedReceiver, + ) -> eyre::Result> + where + DB: Database + Unpin + 'static, + U: SyncStateUpdater + Unpin + 'static, + C: Consensus + Unpin + 'static, + { + let executor_factory = Factory::new(self.chain.clone()); + let tree_externals = + TreeExternals::new(db.clone(), consensus, executor_factory, self.chain.clone()); + let blockchain_tree = BlockchainTree::new(tree_externals, BlockchainTreeConfig::default())?; + + Ok(BeaconConsensusEngine::new(db, pipeline, blockchain_tree, message_rx, self.max_block)) + } + fn load_config(&self) -> eyre::Result { confy::load_path::(&self.config).wrap_err("Could not load config") } @@ -355,7 +404,7 @@ impl Command { fn init_engine_api( &self, db: Arc>, - forkchoice_state_tx: watch::Sender, + engine_tx: UnboundedSender, task_executor: &TaskExecutor, ) -> EngineApiHandle { let (message_tx, message_rx) = unbounded_channel(); @@ -363,7 +412,7 @@ impl Command { ShareableDatabase::new(db, self.chain.clone()), self.chain.clone(), message_rx, - forkchoice_state_tx, + engine_tx, ); task_executor.spawn_critical("engine API task", engine_api); message_tx diff --git a/crates/consensus/beacon/Cargo.toml b/crates/consensus/beacon/Cargo.toml index 1ff30c99f..82c9449ba 100644 --- a/crates/consensus/beacon/Cargo.toml +++ b/crates/consensus/beacon/Cargo.toml @@ -11,6 +11,28 @@ readme = "README.md" reth-consensus-common = { path = "../common" } reth-primitives = { path = "../../primitives" } reth-interfaces = { path = "../../interfaces" } +reth-stages = { path = "../../stages" } +reth-db = { path = "../../storage/db" } +reth-provider = { path = "../../storage/provider" } +reth-executor = { path = "../../executor" } +reth-rpc-types = { path = "../../rpc/rpc-types" } + +# async +tokio = { version = "1.21.2", features = ["sync"] } +tokio-stream = "0.1.10" +futures = "0.3" + +# misc +tracing = "0.1" +thiserror = "1.0" [dev-dependencies] +# reth reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } +reth-stages = { path = "../../stages", features = ["test-utils"] } +reth-executor = { path = "../../executor", features = ["test-utils"] } +reth-db = { path = "../../storage/db", features = ["test-utils"] } +reth-provider = { path = "../../storage/provider", features = ["test-utils"] } +reth-tracing = { path = "../../tracing" } + +assert_matches = "1.5" diff --git a/crates/consensus/beacon/src/engine/error.rs b/crates/consensus/beacon/src/engine/error.rs new file mode 100644 index 000000000..64da402e9 --- /dev/null +++ b/crates/consensus/beacon/src/engine/error.rs @@ -0,0 +1,23 @@ +use reth_rpc_types::engine::PayloadError; +use reth_stages::PipelineError; +use thiserror::Error; + +/// Beacon engine result. +pub type BeaconEngineResult = Result; + +/// The error wrapper for the beacon consensus engine. +#[derive(Error, Debug)] +pub enum BeaconEngineError { + /// Forkchoice zero hash head received. + #[error("Received zero hash as forkchoice head")] + ForkchoiceEmptyHead, + /// Encountered a payload error. + #[error(transparent)] + Payload(#[from] PayloadError), + /// Pipeline error. + #[error(transparent)] + Pipeline(#[from] PipelineError), + /// Common error. Wrapper around [reth_interfaces::Error]. + #[error(transparent)] + Common(#[from] reth_interfaces::Error), +} diff --git a/crates/consensus/beacon/src/engine/message.rs b/crates/consensus/beacon/src/engine/message.rs new file mode 100644 index 000000000..4b39a9054 --- /dev/null +++ b/crates/consensus/beacon/src/engine/message.rs @@ -0,0 +1,30 @@ +use crate::BeaconEngineResult; +use reth_interfaces::consensus::ForkchoiceState; +use reth_rpc_types::engine::{ + ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, +}; +use tokio::sync::oneshot; + +/// Beacon engine sender. +pub type BeaconEngineSender = oneshot::Sender>; + +/// A message for the beacon engine from other components of the node. +#[derive(Debug)] +pub enum BeaconEngineMessage { + /// Message with new payload. + NewPayload { + /// The execution payload received by Engine API. + payload: ExecutionPayload, + /// The sender for returning payload status result. + tx: BeaconEngineSender, + }, + /// Message with updated forkchoice state. + ForkchoiceUpdated { + /// The updated forkchoice state. + state: ForkchoiceState, + /// The payload attributes for block building. + payload_attrs: Option, + /// The sender for returning forkchoice updated result. + tx: BeaconEngineSender, + }, +} diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs new file mode 100644 index 000000000..526443bae --- /dev/null +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -0,0 +1,992 @@ +use futures::{Future, FutureExt, StreamExt}; +use reth_db::{database::Database, tables, transaction::DbTx}; +use reth_executor::blockchain_tree::{BlockStatus, BlockchainTree}; +use reth_interfaces::{ + consensus::{Consensus, ForkchoiceState}, + executor::Error as ExecutorError, + sync::SyncStateUpdater, + Error, +}; +use reth_primitives::{BlockHash, BlockNumber, SealedBlock, H256}; +use reth_provider::ExecutorFactory; +use reth_rpc_types::engine::{ + ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum, +}; +use reth_stages::Pipeline; +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::*; + +mod error; +pub use error::{BeaconEngineError, BeaconEngineResult}; + +mod message; +pub use message::{BeaconEngineMessage, BeaconEngineSender}; + +mod pipeline_state; +pub use pipeline_state::PipelineState; + +/// The beacon consensus engine is the driver that switches between historical and live sync. +/// +/// The beacon consensus engine is itself driven by messages from the Consensus Layer, which are +/// received by Engine API. +/// +/// The consensus engine is idle until it receives the first +/// [BeaconEngineMessage::ForkchoiceUpdated] message from the CL which would initiate the sync. At +/// first, the consensus engine would run the [Pipeline] until the latest known block hash. +/// Afterwards, it would attempt to create/restore the [BlockchainTree] from the blocks +/// that are currently available. In case the restoration is successful, the consensus engine would +/// run in a live sync mode, which mean it would solemnly rely on the messages from Engine API to +/// construct the chain forward. +/// +/// # Panics +/// +/// If the future is polled more than once. Leads to undefined state. +#[must_use = "Future does nothing unless polled"] +pub struct BeaconConsensusEngine +where + DB: Database, + U: SyncStateUpdater, + C: Consensus, + EF: ExecutorFactory, +{ + /// The database handle. + db: Arc, + /// The current state of the pipeline. + /// Must always be [Some] unless the state is being reevaluated. + /// The pipeline is used for historical sync by setting the current forkchoice head. + pipeline_state: Option>, + /// The blockchain tree used for live sync and reorg tracking. + blockchain_tree: BlockchainTree, + /// The Engine API message receiver. + message_rx: UnboundedReceiverStream, + /// Current forkchoice state. The engine must receive the initial state in order to start + /// syncing. + forkchoice_state: Option, + /// Next action that the engine should take after the pipeline finished running. + next_action: BeaconEngineAction, + /// Max block after which the consensus engine would terminate the sync. Used for debugging + /// purposes. + max_block: Option, +} + +impl BeaconConsensusEngine +where + DB: Database + Unpin + 'static, + U: SyncStateUpdater + 'static, + C: Consensus, + EF: ExecutorFactory + 'static, +{ + /// Create new instance of the [BeaconConsensusEngine]. + /// + /// The `message_rx` receiver is connected to the Engine API and is used to + /// handle the messages received from the Consensus Layer. + pub fn new( + db: Arc, + pipeline: Pipeline, + blockchain_tree: BlockchainTree, + message_rx: UnboundedReceiver, + max_block: Option, + ) -> Self { + Self { + db, + pipeline_state: Some(PipelineState::Idle(pipeline)), + blockchain_tree, + message_rx: UnboundedReceiverStream::new(message_rx), + forkchoice_state: None, + next_action: BeaconEngineAction::None, + max_block, + } + } + + /// Returns `true` if the pipeline is currently idle. + fn is_pipeline_idle(&self) -> bool { + self.pipeline_state.as_ref().expect("pipeline state is set").is_idle() + } + + /// Set next action to [BeaconEngineAction::RunPipeline] to indicate that + /// consensus engine needs to run the pipeline as soon as it becomes available. + fn require_pipeline_run(&mut self, target: PipelineTarget) { + self.next_action = BeaconEngineAction::RunPipeline(target); + } + + /// Called to resolve chain forks and ensure that the Execution layer is working with the latest + /// valid chain. + /// + /// These responses should adhere to the [Engine API Spec for + /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1). + fn on_forkchoice_updated( + &mut self, + state: ForkchoiceState, + _attrs: Option, + ) -> ForkchoiceUpdated { + trace!(target: "consensus::engine", ?state, "Received new forkchoice state"); + if state.head_block_hash.is_zero() { + return ForkchoiceUpdated::new(PayloadStatus::from_status(PayloadStatusEnum::Invalid { + validation_error: BeaconEngineError::ForkchoiceEmptyHead.to_string(), + })) + } + + let is_first_forkchoice = self.forkchoice_state.is_none(); + self.forkchoice_state = Some(state); + let status = if self.is_pipeline_idle() { + match self.blockchain_tree.make_canonical(&state.head_block_hash) { + Ok(_) => PayloadStatus::from_status(PayloadStatusEnum::Valid), + Err(error) => { + error!(target: "consensus::engine", ?state, ?error, "Error canonicalizing the head hash"); + // If this is the first forkchoice received, start downloading from safe block + // hash. + let target = if is_first_forkchoice { + PipelineTarget::Safe + } else { + PipelineTarget::Head + }; + self.require_pipeline_run(target); + match error { + Error::Execution(error @ ExecutorError::BlockPreMerge { .. }) => { + PayloadStatus::from_status(PayloadStatusEnum::Invalid { + validation_error: error.to_string(), + }) + .with_latest_valid_hash(H256::zero()) + } + _ => PayloadStatus::from_status(PayloadStatusEnum::Syncing), + } + } + } + } else { + PayloadStatus::from_status(PayloadStatusEnum::Syncing) + }; + ForkchoiceUpdated::new(status) + } + + /// When the Consensus layer receives a new block via the consensus gossip protocol, + /// the transactions in the block are sent to the execution layer in the form of a + /// `ExecutionPayload`. The Execution layer executes the transactions and validates the + /// state in the block header, then passes validation data back to Consensus layer, that + /// adds the block to the head of its own blockchain and attests to it. The block is then + /// broadcasted over the consensus p2p network in the form of a "Beacon block". + /// + /// These responses should adhere to the [Engine API Spec for + /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification). + fn on_new_payload(&mut self, payload: ExecutionPayload) -> PayloadStatus { + let block = match SealedBlock::try_from(payload) { + Ok(block) => block, + Err(error) => { + return PayloadStatus::from_status(PayloadStatusEnum::InvalidBlockHash { + validation_error: error.to_string(), + }) + } + }; + + if self.is_pipeline_idle() { + let block_hash = block.hash; + match self.blockchain_tree.insert_block(block) { + Ok(status) => { + let latest_valid_hash = + matches!(status, BlockStatus::Valid).then_some(block_hash); + let status = match status { + BlockStatus::Valid => PayloadStatusEnum::Valid, + BlockStatus::Accepted => PayloadStatusEnum::Accepted, + BlockStatus::Disconnected => PayloadStatusEnum::Syncing, + }; + PayloadStatus::new(status, latest_valid_hash) + } + Err(error) => { + let latest_valid_hash = + matches!(error, Error::Execution(ExecutorError::BlockPreMerge { .. })) + .then_some(H256::zero()); + PayloadStatus::new( + PayloadStatusEnum::Invalid { validation_error: error.to_string() }, + latest_valid_hash, + ) + } + } + } else { + PayloadStatus::from_status(PayloadStatusEnum::Syncing) + } + } + + /// Returns the next pipeline state depending on the current value of the next action. + /// Resets the next action to the default value. + fn next_pipeline_state( + &mut self, + pipeline: Pipeline, + forkchoice_state: ForkchoiceState, + ) -> PipelineState { + let next_action = std::mem::take(&mut self.next_action); + if let BeaconEngineAction::RunPipeline(target) = next_action { + let tip = match target { + PipelineTarget::Head => forkchoice_state.head_block_hash, + PipelineTarget::Safe => forkchoice_state.safe_block_hash, + }; + trace!(target: "consensus::engine", ?tip, "Starting the pipeline"); + PipelineState::Running(pipeline.run_as_fut(self.db.clone(), tip)) + } else { + PipelineState::Idle(pipeline) + } + } + + /// Attempt to restore the tree with the finalized block number. + /// If the finalized block is missing from the database, trigger the pipeline run. + fn restore_tree_if_possible( + &mut self, + finalized_hash: BlockHash, + ) -> Result<(), reth_interfaces::Error> { + match self.db.view(|tx| tx.get::(finalized_hash))?? { + Some(number) => self.blockchain_tree.restore_canonical_hashes(number)?, + None => self.require_pipeline_run(PipelineTarget::Head), + }; + Ok(()) + } + + /// Check if the engine reached max block as specified by `max_block` parameter. + fn has_reached_max_block(&self, progress: Option) -> bool { + if progress.zip(self.max_block).map_or(false, |(progress, target)| progress >= target) { + trace!( + target: "consensus::engine", + ?progress, + max_block = ?self.max_block, + "Consensus engine reached max block." + ); + true + } else { + false + } + } +} + +/// On initialization, the consensus engine will poll the message receiver and return +/// [Poll::Pending] until the first forkchoice update message is received. +/// +/// As soon as the consensus engine receives the first forkchoice updated message and updates the +/// local forkchoice state, it will launch the pipeline to sync to the head hash. +/// While the pipeline is syncing, the consensus engine will keep processing messages from the +/// receiver and forwarding them to the blockchain tree. +impl Future for BeaconConsensusEngine +where + DB: Database + Unpin + 'static, + U: SyncStateUpdater + Unpin + 'static, + C: Consensus + Unpin, + EF: ExecutorFactory + Unpin + 'static, +{ + type Output = Result<(), BeaconEngineError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + // Set the next pipeline state. + loop { + // Process all incoming messages first. + while let Poll::Ready(Some(msg)) = this.message_rx.poll_next_unpin(cx) { + match msg { + BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { + let response = this.on_forkchoice_updated(state, payload_attrs); + let is_valid_response = + matches!(response.payload_status.status, PayloadStatusEnum::Valid); + let _ = tx.send(Ok(response)); + + // Terminate the sync early if it's reached the maximum user + // configured block. + if is_valid_response { + let tip_number = this.blockchain_tree.canonical_tip_number(); + if this.has_reached_max_block(tip_number) { + return Poll::Ready(Ok(())) + } + } + } + BeaconEngineMessage::NewPayload { payload, tx } => { + let response = this.on_new_payload(payload); + let _ = tx.send(Ok(response)); + } + } + } + + // Lookup the forkchoice state. We can't launch the pipeline without the tip. + let forkchoice_state = match &this.forkchoice_state { + Some(state) => *state, + None => return Poll::Pending, + }; + + let next_state = match this.pipeline_state.take().expect("pipeline state is set") { + PipelineState::Running(mut fut) => { + match fut.poll_unpin(cx) { + Poll::Ready((pipeline, result)) => { + if let Err(error) = result { + return Poll::Ready(Err(error.into())) + } + + match result { + Ok(_) => { + // Terminate the sync early if it's reached the maximum user + // configured block. + let minimum_pipeline_progress = *pipeline.minimum_progress(); + if this.has_reached_max_block(minimum_pipeline_progress) { + return Poll::Ready(Ok(())) + } + } + // Any pipeline error at this point is fatal. + Err(error) => return Poll::Ready(Err(error.into())), + }; + + // Update the state and hashes of the blockchain tree if possible + if let Err(error) = + this.restore_tree_if_possible(forkchoice_state.finalized_block_hash) + { + error!(target: "consensus::engine", ?error, "Error restoring blockchain tree"); + return Poll::Ready(Err(error.into())) + } + + // Get next pipeline state. + this.next_pipeline_state(pipeline, forkchoice_state) + } + Poll::Pending => { + this.pipeline_state = Some(PipelineState::Running(fut)); + return Poll::Pending + } + } + } + PipelineState::Idle(pipeline) => { + this.next_pipeline_state(pipeline, forkchoice_state) + } + }; + this.pipeline_state = Some(next_state); + + // If the pipeline is idle, break from the loop. + if this.is_pipeline_idle() { + return Poll::Pending + } + } + } +} + +/// Denotes the next action that the [BeaconConsensusEngine] should take. +#[derive(Debug, Default)] +enum BeaconEngineAction { + #[default] + None, + /// Contains the type of target hash to pass to the pipeline + RunPipeline(PipelineTarget), +} + +/// The target hash to pass to the pipeline. +#[derive(Debug, Default)] +enum PipelineTarget { + /// Corresponds to the head block hash. + #[default] + Head, + /// Corresponds to the safe block hash. + Safe, +} + +#[cfg(test)] +mod tests { + use super::*; + use assert_matches::assert_matches; + use reth_db::mdbx::{test_utils::create_test_rw_db, Env, WriteMap}; + use reth_executor::{ + blockchain_tree::{config::BlockchainTreeConfig, externals::TreeExternals}, + post_state::PostState, + test_utils::TestExecutorFactory, + }; + use reth_interfaces::{sync::NoopSyncStateUpdate, test_utils::TestConsensus}; + use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET}; + use reth_provider::Transaction; + use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError}; + use std::{collections::VecDeque, time::Duration}; + use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedSender}, + oneshot::{self, error::TryRecvError}, + watch, + }; + + type TestBeaconConsensusEngine = BeaconConsensusEngine< + Env, + NoopSyncStateUpdate, + TestConsensus, + TestExecutorFactory, + >; + + struct TestEnv { + db: Arc, + // Keep the tip receiver around, so it's not dropped. + #[allow(dead_code)] + tip_rx: watch::Receiver, + sync_tx: UnboundedSender, + } + + impl TestEnv { + fn new( + db: Arc, + tip_rx: watch::Receiver, + sync_tx: UnboundedSender, + ) -> Self { + Self { db, tip_rx, sync_tx } + } + + fn send_new_payload( + &self, + payload: ExecutionPayload, + ) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + self.sync_tx + .send(BeaconEngineMessage::NewPayload { payload, tx }) + .expect("failed to send msg"); + rx + } + + fn send_forkchoice_updated( + &self, + state: ForkchoiceState, + ) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + self.sync_tx + .send(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs: None, tx }) + .expect("failed to send msg"); + rx + } + } + + fn setup_consensus_engine( + chain_spec: Arc, + pipeline_exec_outputs: VecDeque>, + executor_results: Vec, + ) -> (TestBeaconConsensusEngine, TestEnv>) { + reth_tracing::init_test_tracing(); + let db = create_test_rw_db(); + let consensus = TestConsensus::default(); + let executor_factory = TestExecutorFactory::new(chain_spec.clone()); + executor_factory.extend(executor_results); + + // Setup pipeline + let (tip_tx, tip_rx) = watch::channel(H256::default()); + let pipeline = Pipeline::builder() + .add_stages(TestStages::new(pipeline_exec_outputs, Default::default())) + .with_tip_sender(tip_tx) + .build(); + + // Setup blockchain tree + let externals = TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec); + let config = BlockchainTreeConfig::new(1, 2, 3); + let tree = BlockchainTree::new(externals, config).expect("failed to create tree"); + + let (sync_tx, sync_rx) = unbounded_channel(); + ( + BeaconConsensusEngine::new(db.clone(), pipeline, tree, sync_rx, None), + TestEnv::new(db, tip_rx, sync_tx), + ) + } + + fn spawn_consensus_engine( + engine: TestBeaconConsensusEngine, + ) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + tokio::spawn(async move { + let result = engine.await; + tx.send(result).expect("failed to forward consensus engine result"); + }); + rx + } + + // Pipeline error is propagated. + #[tokio::test] + async fn pipeline_error_is_propagated() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + let (consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([Err(StageError::ChannelClosed)]), + Vec::default(), + ); + let rx = spawn_consensus_engine(consensus_engine); + + let _ = env.send_forkchoice_updated(ForkchoiceState { + head_block_hash: H256::random(), + ..Default::default() + }); + assert_matches!( + rx.await, + Ok(Err(BeaconEngineError::Pipeline(PipelineError::Stage(StageError::ChannelClosed)))) + ); + } + + // Test that the consensus engine is idle until first forkchoice updated is received. + #[tokio::test] + async fn is_idle_until_forkchoice_is_set() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + let (consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([Err(StageError::ChannelClosed)]), + Vec::default(), + ); + let mut rx = spawn_consensus_engine(consensus_engine); + + // consensus engine is idle + std::thread::sleep(Duration::from_millis(100)); + assert_matches!(rx.try_recv(), Err(TryRecvError::Empty)); + + // consensus engine is still idle + let _ = env.send_new_payload(SealedBlock::default().into()); + assert_matches!(rx.try_recv(), Err(TryRecvError::Empty)); + + // consensus engine receives a forkchoice state and triggers the pipeline + let _ = env.send_forkchoice_updated(ForkchoiceState { + head_block_hash: H256::random(), + ..Default::default() + }); + assert_matches!( + rx.await, + Ok(Err(BeaconEngineError::Pipeline(PipelineError::Stage(StageError::ChannelClosed)))) + ); + } + + // Test that the consensus engine runs the pipeline again if the tree cannot be restored. + // The consensus engine will propagate the second result (error) only if it runs the pipeline + // for the second time. + #[tokio::test] + async fn runs_pipeline_again_if_tree_not_restored() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + let (consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([ + Ok(ExecOutput { stage_progress: 1, done: true }), + Err(StageError::ChannelClosed), + ]), + Vec::default(), + ); + let rx = spawn_consensus_engine(consensus_engine); + + let _ = env.send_forkchoice_updated(ForkchoiceState { + head_block_hash: H256::random(), + ..Default::default() + }); + + assert_matches!( + rx.await, + Ok(Err(BeaconEngineError::Pipeline(PipelineError::Stage(StageError::ChannelClosed)))) + ); + } + + #[tokio::test] + async fn terminates_upon_reaching_max_block() { + let max_block = 1000; + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + let (mut consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([Ok(ExecOutput { stage_progress: max_block, done: true })]), + Vec::default(), + ); + consensus_engine.max_block = Some(max_block); + let rx = spawn_consensus_engine(consensus_engine); + + let _ = env.send_forkchoice_updated(ForkchoiceState { + head_block_hash: H256::random(), + ..Default::default() + }); + assert_matches!(rx.await, Ok(Ok(()))); + } + + fn insert_blocks<'a, DB: Database>(db: &DB, mut blocks: impl Iterator) { + let mut transaction = Transaction::new(db).unwrap(); + blocks + .try_for_each(|b| { + transaction + .insert_block(SealedBlockWithSenders::new(b.clone(), Vec::default()).unwrap()) + }) + .expect("failed to insert"); + transaction.commit().unwrap(); + } + + mod fork_choice_updated { + use super::*; + use reth_interfaces::test_utils::generators::random_block; + + #[tokio::test] + async fn empty_head() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + let (consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]), + Vec::default(), + ); + + let mut engine_rx = spawn_consensus_engine(consensus_engine); + + let rx = env.send_forkchoice_updated(ForkchoiceState::default()); + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { + validation_error: BeaconEngineError::ForkchoiceEmptyHead.to_string(), + }); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[tokio::test] + async fn valid_forkchoice() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + let (consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]), + Vec::default(), + ); + + let genesis = random_block(0, None, None, Some(0)); + let block1 = random_block(1, Some(genesis.hash), None, Some(0)); + insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter()); + + let mut engine_rx = spawn_consensus_engine(consensus_engine); + + let forkchoice = ForkchoiceState { + head_block_hash: block1.hash, + finalized_block_hash: block1.hash, + ..Default::default() + }; + + let rx_invalid = env.send_forkchoice_updated(forkchoice); + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); + assert_matches!(rx_invalid.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + let rx_valid = env.send_forkchoice_updated(forkchoice); + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Valid); + assert_matches!(rx_valid.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[tokio::test] + async fn unknown_head_hash() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + let (consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([ + Ok(ExecOutput { done: true, stage_progress: 0 }), + Ok(ExecOutput { done: true, stage_progress: 0 }), + Ok(ExecOutput { done: true, stage_progress: 0 }), + ]), + Vec::default(), + ); + + let genesis = random_block(0, None, None, Some(0)); + let block1 = random_block(1, Some(genesis.hash), None, Some(0)); + insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter()); + + let mut engine_rx = spawn_consensus_engine(consensus_engine); + + let invalid_forkchoice_state = ForkchoiceState { + head_block_hash: H256::random(), + finalized_block_hash: block1.hash, + ..Default::default() + }; + + let rx = env.send_forkchoice_updated(invalid_forkchoice_state); + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + let rx = env.send_forkchoice_updated(invalid_forkchoice_state); + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + let rx_valid = env.send_forkchoice_updated(ForkchoiceState { + head_block_hash: H256::random(), + finalized_block_hash: block1.hash, + ..Default::default() + }); + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); + assert_matches!(rx_valid.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[tokio::test] + async fn unknown_finalized_hash() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + let (consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]), + Vec::default(), + ); + + let genesis = random_block(0, None, None, Some(0)); + let block1 = random_block(1, Some(genesis.hash), None, Some(0)); + insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter()); + + let _ = spawn_consensus_engine(consensus_engine); + + let rx = env.send_forkchoice_updated(ForkchoiceState { + head_block_hash: H256::random(), + finalized_block_hash: block1.hash, + ..Default::default() + }); + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + } + + #[tokio::test] + async fn forkchoice_updated_invalid_pow() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .london_activated() + .build(), + ); + let (consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([ + Ok(ExecOutput { done: true, stage_progress: 0 }), + Ok(ExecOutput { done: true, stage_progress: 0 }), + ]), + Vec::default(), + ); + + let genesis = random_block(0, None, None, Some(0)); + let block1 = random_block(1, Some(genesis.hash), None, Some(0)); + + insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter()); + + let _ = spawn_consensus_engine(consensus_engine); + + let rx = env.send_forkchoice_updated(ForkchoiceState { + head_block_hash: H256::random(), + finalized_block_hash: block1.hash, + ..Default::default() + }); + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + let rx = env.send_forkchoice_updated(ForkchoiceState { + head_block_hash: block1.hash, + finalized_block_hash: block1.hash, + ..Default::default() + }); + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { + validation_error: ExecutorError::BlockPreMerge { hash: block1.hash }.to_string(), + }) + .with_latest_valid_hash(H256::zero()); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + } + } + + mod new_payload { + use super::*; + use reth_interfaces::{ + executor::Error as ExecutorError, test_utils::generators::random_block, + }; + use reth_primitives::{Hardfork, U256}; + use reth_provider::test_utils::blocks::BlockChainTestData; + + #[tokio::test] + async fn new_payload_before_forkchoice() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + let (consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]), + Vec::default(), + ); + + let mut engine_rx = spawn_consensus_engine(consensus_engine); + + // Send new payload + let rx = env.send_new_payload(random_block(0, None, None, Some(0)).into()); + // Invalid, because this is a genesis block + assert_matches!(rx.await, Ok(Ok(result)) => assert_matches!(result.status, PayloadStatusEnum::Invalid { .. })); + + // Send new payload + let rx = env.send_new_payload(random_block(1, None, None, Some(0)).into()); + let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Syncing); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[tokio::test] + async fn payload_known() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + let (consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]), + Vec::default(), + ); + + let genesis = random_block(0, None, None, Some(0)); + let block1 = random_block(1, Some(genesis.hash), None, Some(0)); + let block2 = random_block(2, Some(block1.hash), None, Some(0)); + insert_blocks(env.db.as_ref(), [&genesis, &block1, &block2].into_iter()); + + let mut engine_rx = spawn_consensus_engine(consensus_engine); + + // Send forkchoice + let rx = env.send_forkchoice_updated(ForkchoiceState { + head_block_hash: block1.hash, + finalized_block_hash: block1.hash, + ..Default::default() + }); + let expected_result = + ForkchoiceUpdated::new(PayloadStatus::from_status(PayloadStatusEnum::Syncing)); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + // Send new payload + let rx = env.send_new_payload(block2.clone().into()); + let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid) + .with_latest_valid_hash(block2.hash); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[tokio::test] + async fn payload_parent_unknown() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + let (consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]), + Vec::default(), + ); + + let genesis = random_block(0, None, None, Some(0)); + + insert_blocks(env.db.as_ref(), [&genesis].into_iter()); + + let mut engine_rx = spawn_consensus_engine(consensus_engine); + + // Send forkchoice + let rx = env.send_forkchoice_updated(ForkchoiceState { + head_block_hash: genesis.hash, + finalized_block_hash: genesis.hash, + ..Default::default() + }); + let expected_result = + ForkchoiceUpdated::new(PayloadStatus::from_status(PayloadStatusEnum::Syncing)); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + // Send new payload + let block = random_block(2, Some(H256::random()), None, Some(0)); + let rx = env.send_new_payload(block.into()); + let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Syncing); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[tokio::test] + async fn payload_pre_merge() { + let data = BlockChainTestData::default(); + let mut block1 = data.blocks[0].0.block.clone(); + block1.header.difficulty = MAINNET.fork(Hardfork::Paris).ttd().unwrap() - U256::from(1); + block1 = block1.unseal().seal_slow(); + let (block2, exec_result2) = data.blocks[1].clone(); + let mut block2 = block2.block; + block2.withdrawals = None; + block2.header.parent_hash = block1.hash; + block2.header.base_fee_per_gas = Some(100); + block2.header.difficulty = U256::ZERO; + block2 = block2.unseal().seal_slow(); + + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .london_activated() + .build(), + ); + let (consensus_engine, env) = setup_consensus_engine( + chain_spec, + VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]), + Vec::from([exec_result2]), + ); + + insert_blocks(env.db.as_ref(), [&data.genesis, &block1].into_iter()); + + let mut engine_rx = spawn_consensus_engine(consensus_engine); + + // Send forkchoice + let rx = env.send_forkchoice_updated(ForkchoiceState { + head_block_hash: block1.hash, + finalized_block_hash: block1.hash, + ..Default::default() + }); + let expected_result = + ForkchoiceUpdated::new(PayloadStatus::from_status(PayloadStatusEnum::Syncing)); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + // Send new payload + let rx = env.send_new_payload(block2.clone().into()); + let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid { + validation_error: ExecutorError::BlockPreMerge { hash: block2.hash }.to_string(), + }) + .with_latest_valid_hash(H256::zero()); + assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); + } + } +} diff --git a/crates/consensus/beacon/src/engine/pipeline_state.rs b/crates/consensus/beacon/src/engine/pipeline_state.rs new file mode 100644 index 000000000..42ffe1489 --- /dev/null +++ b/crates/consensus/beacon/src/engine/pipeline_state.rs @@ -0,0 +1,26 @@ +use reth_db::database::Database; +use reth_interfaces::sync::SyncStateUpdater; +use reth_stages::{Pipeline, PipelineFut}; + +/// The possible pipeline states within the sync controller. +/// +/// [PipelineState::Idle] means that the pipeline is currently idle. +/// [PipelineState::Running] means that the pipeline is currently running. +/// +/// NOTE: The differentiation between these two states is important, because when the pipeline is +/// running, it acquires the write lock over the database. This means that we cannot forward to the +/// blockchain tree any messages that would result in database writes, since it would result in a +/// deadlock. +pub enum PipelineState { + /// Pipeline is idle. + Idle(Pipeline), + /// Pipeline is running. + Running(PipelineFut), +} + +impl PipelineState { + /// Returns `true` if the state matches idle. + pub fn is_idle(&self) -> bool { + matches!(self, PipelineState::Idle(_)) + } +} diff --git a/crates/consensus/beacon/src/lib.rs b/crates/consensus/beacon/src/lib.rs index 04d5af328..f76d1d6c1 100644 --- a/crates/consensus/beacon/src/lib.rs +++ b/crates/consensus/beacon/src/lib.rs @@ -8,3 +8,6 @@ mod beacon_consensus; pub use beacon_consensus::BeaconConsensus; + +mod engine; +pub use engine::*; diff --git a/crates/executor/src/blockchain_tree/mod.rs b/crates/executor/src/blockchain_tree/mod.rs index 6564f06ba..6ae964007 100644 --- a/crates/executor/src/blockchain_tree/mod.rs +++ b/crates/executor/src/blockchain_tree/mod.rs @@ -2,7 +2,9 @@ use chain::{BlockChainId, Chain, ForkBlock}; use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; use reth_interfaces::{consensus::Consensus, executor::Error as ExecError, Error}; -use reth_primitives::{BlockHash, BlockNumber, SealedBlock, SealedBlockWithSenders}; +use reth_primitives::{ + BlockHash, BlockNumber, Hardfork, SealedBlock, SealedBlockWithSenders, U256, +}; use reth_provider::{ providers::ChainState, ExecutorFactory, HeaderProvider, StateProviderFactory, Transaction, }; @@ -84,7 +86,7 @@ pub enum BlockStatus { /// If block validation is valid and block extends canonical chain. /// In BlockchainTree sense it forks on canonical tip. Valid, - /// If block validation is valid but block does not extend canonical chain + /// If the block is valid, but it does not extend canonical chain /// (It is side chain) or hasn't been fully validated but ancestors of a payload are known. Accepted, /// If blocks is not connected to canonical chain. @@ -139,6 +141,11 @@ impl BlockchainTree }) } + /// Return the tip of the canonical chain + pub fn canonical_tip_number(&self) -> Option { + self.block_indices.canonical_chain().last_key_value().map(|(number, _)| *number) + } + /// Create a new sidechain by forking the given chain, or append the block if the parent block /// is the top of the given chain. fn fork_side_chain( @@ -162,11 +169,11 @@ impl BlockchainTree let canonical_block_hashes = self.block_indices.canonical_chain(); // get canonical tip - let (_, canonical_tip_hash) = - canonical_block_hashes.last_key_value().map(|(i, j)| (*i, *j)).unwrap_or_default(); + let canonical_tip = + canonical_block_hashes.last_key_value().map(|(_, hash)| *hash).unwrap_or_default(); let db = self.externals.shareable_db(); - let provider = if canonical_fork.hash == canonical_tip_hash { + let provider = if canonical_fork.hash == canonical_tip { ChainState::boxed(db.latest()?) } else { ChainState::boxed(db.history_by_block_number(canonical_fork.number)?) @@ -209,26 +216,34 @@ impl BlockchainTree &mut self, block: SealedBlockWithSenders, ) -> Result { - let canonical_block_hashes = self.block_indices.canonical_chain(); - let (_, canonical_tip) = - canonical_block_hashes.last_key_value().map(|(i, j)| (*i, *j)).unwrap_or_default(); - - // create state provider let db = self.externals.shareable_db(); - let parent_header = db - .header(&block.parent_hash)? - .ok_or(ExecError::CanonicalChain { block_hash: block.parent_hash })?; - let block_status; - let provider = if block.parent_hash == canonical_tip { - block_status = BlockStatus::Valid; - ChainState::boxed(db.latest()?) + // Validate that the block is post merge + let parent_td = db + .header_td(&block.parent_hash)? + .ok_or(ExecError::CanonicalChain { block_hash: block.parent_hash })?; + // Pass the parent total difficulty to short-circuit unnecessary calculations. + if !self.externals.chain_spec.fork(Hardfork::Paris).active_at_ttd(parent_td, U256::ZERO) { + return Err(ExecError::BlockPreMerge { hash: block.hash }.into()) + } + + // Create state provider + let canonical_block_hashes = self.block_indices.canonical_chain(); + let canonical_tip = + canonical_block_hashes.last_key_value().map(|(_, hash)| *hash).unwrap_or_default(); + let (block_status, provider) = if block.parent_hash == canonical_tip { + (BlockStatus::Valid, ChainState::boxed(db.latest()?)) } else { - block_status = BlockStatus::Accepted; - ChainState::boxed(db.history_by_block_number(block.number - 1)?) + ( + BlockStatus::Accepted, + ChainState::boxed(db.history_by_block_number(block.number - 1)?), + ) }; - let parent_header = parent_header.seal(block.parent_hash); + let parent_header = db + .header(&block.parent_hash)? + .ok_or(ExecError::CanonicalChain { block_hash: block.parent_hash })? + .seal(block.parent_hash); let chain = Chain::new_canonical_fork( &block, &parent_header, @@ -477,13 +492,20 @@ impl BlockchainTree /// /// Returns `Ok` if the blocks were canonicalized, or if the blocks were already canonical. pub fn make_canonical(&mut self, block_hash: &BlockHash) -> Result<(), Error> { - let chain_id = if let Some(chain_id) = self.block_indices.get_blocks_chain_id(block_hash) { - chain_id - } else { - // If block is already canonical don't return error. - if self.block_indices.is_block_hash_canonical(block_hash) { - return Ok(()) + // If block is already canonical don't return error. + if self.block_indices.is_block_hash_canonical(block_hash) { + let td = self + .externals + .shareable_db() + .header_td(block_hash)? + .ok_or(ExecError::MissingTotalDifficulty { hash: *block_hash })?; + if !self.externals.chain_spec.fork(Hardfork::Paris).active_at_ttd(td, U256::ZERO) { + return Err(ExecError::BlockPreMerge { hash: *block_hash }.into()) } + return Ok(()) + } + + let Some(chain_id) = self.block_indices.get_blocks_chain_id(block_hash) else { return Err(ExecError::BlockHashNotFoundInChain { block_hash: *block_hash }.into()) }; let chain = self.chains.remove(&chain_id).expect("To be present"); @@ -694,7 +716,7 @@ mod tests { let (mut block2, exec2) = data.blocks[1].clone(); block2.number = 12; - // test pops execution results from vector, so order is from last to first.ß + // test pops execution results from vector, so order is from last to first. let externals = setup_externals(vec![exec2.clone(), exec1.clone(), exec2, exec1]); // last finalized block would be number 9. diff --git a/crates/interfaces/src/executor.rs b/crates/interfaces/src/executor.rs index a40f60a0e..6c9ee95c5 100644 --- a/crates/interfaces/src/executor.rs +++ b/crates/interfaces/src/executor.rs @@ -7,7 +7,7 @@ use thiserror::Error; pub enum Error { #[error("EVM reported invalid transaction ({hash:?}): {message}")] EVM { hash: H256, message: String }, - #[error("Example of error.")] + #[error("Verification failed.")] VerificationFailed, #[error("Fatal internal error")] ExecutionFatalError, @@ -64,4 +64,8 @@ pub enum Error { CanonicalCommit { inner: String }, #[error("Transaction error on pipeline status update: {inner:?}")] PipelineStatusUpdate { inner: String }, + #[error("Block {hash:?} is pre merge")] + BlockPreMerge { hash: H256 }, + #[error("Missing total difficulty")] + MissingTotalDifficulty { hash: H256 }, } diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index b11b197fe..3d5ac8bbe 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -8,6 +8,7 @@ description = "Helpers for configuring RPC" [dependencies] # reth +reth-primitives = { path = "../../primitives" } reth-ipc = { path = "../ipc" } reth-interfaces = { path = "../../interfaces" } reth-network-api = { path = "../../net/network-api" } @@ -33,7 +34,6 @@ tracing = "0.1" [dev-dependencies] reth-tracing = { path = "../../tracing" } -reth-primitives = { path = "../../primitives" } reth-rpc-api = { path = "../rpc-api", features = ["client"] } reth-transaction-pool = { path = "../../transaction-pool", features = ["test-utils"] } reth-provider = { path = "../../storage/provider", features = ["test-utils"] } diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 42c5d4130..41a42072a 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -1,6 +1,7 @@ pub use jsonrpsee::server::ServerBuilder; use jsonrpsee::{core::Error as RpcError, server::ServerHandle, RpcModule}; use reth_network_api::{NetworkInfo, Peers}; +use reth_primitives::ChainSpec; use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory}; use reth_rpc::{ eth::cache::EthStateCache, AuthLayer, EngineApi, EthApi, JwtAuthValidator, JwtSecret, @@ -9,14 +10,16 @@ use reth_rpc_api::servers::*; use reth_rpc_engine_api::EngineApiHandle; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; /// Configure and launch an auth server with `engine` and a _new_ `eth` namespace. +#[allow(clippy::too_many_arguments)] pub async fn launch( client: Client, pool: Pool, network: Network, executor: Tasks, + chain_spec: Arc, handle: EngineApiHandle, socket_addr: SocketAddr, secret: JwtSecret, @@ -35,13 +38,20 @@ where { // spawn a new cache task let eth_cache = EthStateCache::spawn_with(client.clone(), Default::default(), executor); - launch_with_eth_api(EthApi::new(client, pool, network, eth_cache), handle, socket_addr, secret) - .await + launch_with_eth_api( + EthApi::new(client, pool, network, eth_cache), + chain_spec, + handle, + socket_addr, + secret, + ) + .await } /// Configure and launch an auth server with existing EthApi implementation. pub async fn launch_with_eth_api( eth_api: EthApi, + chain_spec: Arc, handle: EngineApiHandle, socket_addr: SocketAddr, secret: JwtSecret, @@ -59,7 +69,7 @@ where { // Configure the module and start the server. let mut module = RpcModule::new(()); - module.merge(EngineApi::new(handle).into_rpc()).expect("No conflicting methods"); + module.merge(EngineApi::new(chain_spec, handle).into_rpc()).expect("No conflicting methods"); module.merge(eth_api.into_rpc()).expect("No conflicting methods"); // Create auth middleware. diff --git a/crates/rpc/rpc-engine-api/Cargo.toml b/crates/rpc/rpc-engine-api/Cargo.toml index 3be1201f4..95080375c 100644 --- a/crates/rpc/rpc-engine-api/Cargo.toml +++ b/crates/rpc/rpc-engine-api/Cargo.toml @@ -11,10 +11,10 @@ description = "Implementation of Engine API" reth-primitives = { path = "../../primitives" } reth-interfaces = { path = "../../interfaces" } reth-provider = { path = "../../storage/provider" } -reth-rlp = { path = "../../rlp" } reth-executor = { path = "../../executor" } reth-revm = { path = "../../revm" } reth-rpc-types = { path = "../rpc-types" } +reth-beacon-consensus = { path = "../../consensus/beacon" } # async futures = "0.3" diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index bb77f30a2..97df04353 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -1,27 +1,19 @@ -use crate::{message::EngineApiMessageVersion, EngineApiError, EngineApiMessage, EngineApiResult}; +use crate::{EngineApiError, EngineApiMessage, EngineApiResult}; use futures::StreamExt; -use reth_interfaces::consensus::ForkchoiceState; -use reth_primitives::{ - proofs::{self, EMPTY_LIST_HASH}, - BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock, TransactionSigned, - H256, H64, U256, -}; -use reth_provider::{ - BlockExecutor, BlockProvider, EvmEnvProvider, ExecutorFactory, HeaderProvider, - StateProviderFactory, -}; -use reth_rlp::Decodable; -use reth_rpc_types::engine::{ - ExecutionPayload, ExecutionPayloadBodies, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, - PayloadStatusEnum, TransitionConfiguration, -}; +use reth_beacon_consensus::BeaconEngineMessage; +use reth_primitives::{BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork, H64}; +use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory}; +use reth_rpc_types::engine::{ExecutionPayload, ExecutionPayloadBodies, TransitionConfiguration}; use std::{ future::Future, pin::Pin, sync::Arc, task::{ready, Context, Poll}, }; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::sync::{ + mpsc::{self, UnboundedSender}, + oneshot, +}; use tokio_stream::wrappers::UnboundedReceiverStream; /// The Engine API handle. @@ -41,10 +33,7 @@ pub struct EngineApi { /// Consensus configuration chain_spec: Arc, message_rx: UnboundedReceiverStream, - forkchoice_state_tx: watch::Sender, - // TODO: Placeholder for storing future blocks. Make cache bounded. Use lru - // local_store: HashMap, - // remote_store: HashMap, + engine_tx: UnboundedSender, } impl @@ -55,18 +44,16 @@ impl, message_rx: mpsc::UnboundedReceiver, - forkchoice_state_tx: watch::Sender, + engine_tx: UnboundedSender, ) -> Self { - Self { - client, - chain_spec, - message_rx: UnboundedReceiverStream::new(message_rx), - forkchoice_state_tx, - } + Self { client, chain_spec, message_rx: UnboundedReceiverStream::new(message_rx), engine_tx } } fn on_message(&mut self, msg: EngineApiMessage) { match msg { + EngineApiMessage::ExchangeTransitionConfiguration(config, tx) => { + let _ = tx.send(self.exchange_transition_configuration(config)); + } EngineApiMessage::GetPayload(payload_id, tx) => { let _ = tx.send(self.get_payload(payload_id).ok_or(EngineApiError::PayloadUnknown)); } @@ -76,130 +63,21 @@ impl { let _ = tx.send(self.get_payload_bodies_by_range(start, count)); } - EngineApiMessage::NewPayload(version, payload, tx) => { - if let Err(err) = self.validate_withdrawals_presence( - version, - payload.timestamp.as_u64(), - payload.withdrawals.is_some(), - ) { - let _ = tx.send(Err(err)); - return - } - let _ = tx.send(self.new_payload(payload)); + EngineApiMessage::NewPayload(payload, tx) => { + // forward message to the consensus engine + let _ = self.engine_tx.send(BeaconEngineMessage::NewPayload { payload, tx }); } - EngineApiMessage::ForkchoiceUpdated(version, state, attrs, tx) => { - if let Some(attributes) = &attrs { - if let Err(err) = self.validate_withdrawals_presence( - version, - attributes.timestamp.as_u64(), - attributes.withdrawals.is_some(), - ) { - let _ = tx.send(Err(err)); - return - } - } - - let _ = tx.send(self.fork_choice_updated(state, attrs)); - } - EngineApiMessage::ExchangeTransitionConfiguration(config, tx) => { - let _ = tx.send(self.exchange_transition_configuration(config)); + EngineApiMessage::ForkchoiceUpdated(state, payload_attrs, tx) => { + // forward message to the consensus engine + let _ = self.engine_tx.send(BeaconEngineMessage::ForkchoiceUpdated { + state, + payload_attrs, + tx, + }); } } } - /// Validates the presence of the `withdrawals` field according to the payload timestamp. - /// After Shanghai, withdrawals field must be [Some]. - /// Before Shanghai, withdrawals field must be [None]; - fn validate_withdrawals_presence( - &self, - version: EngineApiMessageVersion, - timestamp: u64, - has_withdrawals: bool, - ) -> EngineApiResult<()> { - let is_shanghai = self.chain_spec.fork(Hardfork::Shanghai).active_at_timestamp(timestamp); - - match version { - EngineApiMessageVersion::V1 => { - if is_shanghai || has_withdrawals { - return Err(EngineApiError::InvalidParams) - } - } - EngineApiMessageVersion::V2 => { - let shanghai_with_no_withdrawals = is_shanghai && !has_withdrawals; - let not_shanghai_with_withdrawals = !is_shanghai && has_withdrawals; - if shanghai_with_no_withdrawals || not_shanghai_with_withdrawals { - return Err(EngineApiError::InvalidParams) - } - } - }; - - Ok(()) - } - - /// Try to construct a block from given payload. Perform addition validation of `extra_data` and - /// `base_fee_per_gas` fields. - /// - /// NOTE: The log bloom is assumed to be validated during serialization. - /// NOTE: Empty ommers, nonce and difficulty values are validated upon computing block hash and - /// comparing the value with `payload.block_hash`. - /// - /// See - fn try_construct_block(&self, payload: ExecutionPayload) -> EngineApiResult { - if payload.extra_data.len() > 32 { - return Err(EngineApiError::PayloadExtraData(payload.extra_data)) - } - - if payload.base_fee_per_gas == U256::ZERO { - return Err(EngineApiError::PayloadBaseFee(payload.base_fee_per_gas)) - } - - let transactions = payload - .transactions - .iter() - .map(|tx| TransactionSigned::decode(&mut tx.as_ref())) - .collect::, _>>()?; - let transactions_root = proofs::calculate_transaction_root(transactions.iter()); - - let withdrawals_root = - payload.withdrawals.as_ref().map(|w| proofs::calculate_withdrawals_root(w.iter())); - - let header = Header { - parent_hash: payload.parent_hash, - beneficiary: payload.fee_recipient, - state_root: payload.state_root, - transactions_root, - receipts_root: payload.receipts_root, - withdrawals_root, - logs_bloom: payload.logs_bloom, - number: payload.block_number.as_u64(), - gas_limit: payload.gas_limit.as_u64(), - gas_used: payload.gas_used.as_u64(), - timestamp: payload.timestamp.as_u64(), - mix_hash: payload.prev_randao, - base_fee_per_gas: Some(payload.base_fee_per_gas.to::()), - extra_data: payload.extra_data, - // Defaults - ommers_hash: EMPTY_LIST_HASH, - difficulty: Default::default(), - nonce: Default::default(), - } - .seal_slow(); - - if payload.block_hash != header.hash() { - return Err(EngineApiError::PayloadBlockHash { - execution: header.hash(), - consensus: payload.block_hash, - }) - } - - Ok(SealedBlock { - header, - body: transactions, - withdrawals: payload.withdrawals, - ommers: Default::default(), - }) - } - /// Called to retrieve the latest state of the network, validate new blocks, and maintain /// consistency between the Consensus and Execution layers. /// @@ -225,7 +103,10 @@ impl EngineApiResult { - let block = match self.try_construct_block(payload) { - Ok(b) => b, - Err(err) => { - return Ok(PayloadStatus::from_status(PayloadStatusEnum::InvalidBlockHash { - validation_error: err.to_string(), - })) - } - }; - let block_hash = block.header.hash(); - let parent_hash = block.parent_hash; - - // The block already exists in our database - if self.client.is_known(&block_hash)? { - return Ok(PayloadStatus::new(PayloadStatusEnum::Valid, block_hash)) - } - - let Some(parent) = self.client.block_by_hash(parent_hash)? else { - // TODO: cache block for storing later - return Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing)) - }; - - let Some(parent_td) = self.client.header_td(&block.parent_hash)? else { return Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid { - validation_error: EngineApiError::PayloadPreMerge.to_string(), - })) }; - - // Short circuit the check by passing parent total difficulty. - if !self.chain_spec.fork(Hardfork::Paris).active_at_ttd(parent_td, U256::ZERO) { - // This case returns a `latestValidHash` of zero because it is required by the engine - // api spec: - // - // Client software MUST respond to this method call in the following way: - // { - // status: INVALID, - // latestValidHash: - // 0x0000000000000000000000000000000000000000000000000000000000000000, - // validationError: errorMessage | null - // } - // - // if terminal block conditions are not satisfied - return Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid { - validation_error: EngineApiError::PayloadPreMerge.to_string(), - }) - .with_latest_valid_hash(H256::zero())) - } - - if block.timestamp <= parent.timestamp { - return Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid { - validation_error: EngineApiError::PayloadTimestamp { - invalid: block.timestamp, - latest: parent.timestamp, - } - .to_string(), - })) - } - - let state_provider = self.client.latest()?; - let total_difficulty = parent_td + block.header.difficulty; - - let factory = reth_executor::Factory::new(self.chain_spec.clone()); - let mut executor = factory.with_sp(&state_provider); - match executor.execute_and_verify_receipt(&block.unseal(), total_difficulty, None) { - Ok(_) => Ok(PayloadStatus::new(PayloadStatusEnum::Valid, block_hash)), - Err(err) => Ok(PayloadStatus::new( - PayloadStatusEnum::Invalid { validation_error: err.to_string() }, - parent_hash, // The parent hash is already in our database hence it is valid - )), - } - } - - /// Called to resolve chain forks and ensure that the Execution layer is working with the latest - /// valid chain. - /// - /// These responses should adhere to the [Engine API Spec for - /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1). - pub fn fork_choice_updated( - &self, - fork_choice_state: ForkchoiceState, - payload_attributes: Option, - ) -> EngineApiResult { - let ForkchoiceState { head_block_hash, finalized_block_hash, .. } = fork_choice_state; - - if head_block_hash.is_zero() { - return Ok(ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { - validation_error: EngineApiError::ForkchoiceEmptyHead.to_string(), - })) - } - - let Some(head) = self.client.header(&head_block_hash)? else { - // Block is not known, nothing to do - return Ok(ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing)) - }; - - // The finalized block hash is not known, we are still syncing - if !finalized_block_hash.is_zero() && !self.client.is_known(&finalized_block_hash)? { - return Ok(ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing)) - } - - let Some(head_td) = self.client.header_td(&head_block_hash)? else { - // internal error - we have the head block but not the total difficulty - return Ok(ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { - validation_error: EngineApiError::Internal( - reth_interfaces::provider::ProviderError::TotalDifficulty { - number: head.number, - } - .into(), - ) - .to_string(), - })) - }; - - // From the Engine API spec: - // - // If forkchoiceState.headBlockHash references a PoW block, client software MUST validate - // this block with respect to terminal block conditions according to EIP-3675. This check - // maps to the transition block validity section of the EIP. Additionally, if this - // validation fails, client software MUST NOT update the forkchoice state and MUST NOT - // begin a payload build process. - // - // We use ZERO here because as long as the total difficulty is above the ttd, we are sure - // that the block is EITHER: - // * The terminal PoW block, or - // * A child of the terminal PoW block - // - // Using the head.difficulty instead of U256::ZERO here would be incorrect because it would - // not return true on the terminal PoW block. For the terminal PoW block, head_td - - // head.difficulty would be less than the TTD, causing active_at_ttd to return false. - if !self.chain_spec.fork(Hardfork::Paris).active_at_ttd(head_td, U256::ZERO) { - // This case returns a `latestValidHash` of zero because it is required by the engine - // api spec: - // - // Client software MUST respond to this method call in the following way: - // { - // status: INVALID, - // latestValidHash: - // 0x0000000000000000000000000000000000000000000000000000000000000000, - // validationError: errorMessage | null - // } - // obtained either from the Payload validation process or as a result of validating a - // terminal PoW block referenced by forkchoiceState.headBlockHash - return Ok(ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { - validation_error: EngineApiError::PayloadPreMerge.to_string(), - }) - .with_latest_valid_hash(H256::zero())) - } - - if let Err(error) = self.forkchoice_state_tx.send(fork_choice_state) { - tracing::error!(target: "rpc::engine_api", ?error, "Failed to update forkchoice state"); - } - - if let Some(_attr) = payload_attributes { - // TODO: optionally build the block - } - - let chain_info = self.client.chain_info()?; - Ok(ForkchoiceUpdated::from_status(PayloadStatusEnum::Valid) - .with_latest_valid_hash(chain_info.best_hash)) - } - /// Called to verify network configuration parameters and ensure that Consensus and Execution /// layers are using the latest configuration. pub fn exchange_transition_configuration( @@ -454,7 +170,10 @@ impl (EngineApiTestHandle, EngineApi>) { let chain_spec = Arc::new(MAINNET.clone()); let client = Arc::new(MockEthProvider::default()); let (msg_tx, msg_rx) = unbounded_channel(); - let (forkchoice_state_tx, forkchoice_state_rx) = watch::channel(ForkchoiceState::default()); + let (engine_tx, engine_rx) = mpsc::unbounded_channel(); let api = EngineApi { client: client.clone(), chain_spec: chain_spec.clone(), message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx, + engine_tx, }; - let handle = EngineApiTestHandle { chain_spec, client, msg_tx, forkchoice_state_rx }; + let handle = EngineApiTestHandle { chain_spec, client, msg_tx, engine_rx }; (handle, api) } @@ -523,254 +239,37 @@ mod tests { chain_spec: Arc, client: Arc, msg_tx: UnboundedSender, - forkchoice_state_rx: WatchReceiver, + engine_rx: UnboundedReceiver, } impl EngineApiTestHandle { fn send_message(&self, msg: EngineApiMessage) { self.msg_tx.send(msg).expect("failed to send engine msg"); } - - fn forkchoice_state(&self) -> ForkchoiceState { - self.forkchoice_state_rx.borrow().clone() - } - - fn forkchoice_state_has_changed(&self) -> bool { - self.forkchoice_state_rx.has_changed().unwrap() - } } - mod new_payload { - use super::*; - use reth_interfaces::test_utils::generators::random_header; - use reth_primitives::{ - bytes::{Bytes, BytesMut}, - Block, - }; - use reth_rlp::DecodeError; + #[tokio::test] + async fn forwards_responses_to_consensus_engine() { + let (mut handle, api) = setup_engine_api(); + tokio::spawn(api); - fn transform_block Block>(src: SealedBlock, f: F) -> SealedBlock { - let unsealed = src.unseal(); - let mut transformed: Block = f(unsealed); - // Recalculate roots - transformed.header.transactions_root = - proofs::calculate_transaction_root(transformed.body.iter()); - transformed.header.ommers_hash = - proofs::calculate_ommers_root(transformed.ommers.iter()); - SealedBlock { - header: transformed.header.seal_slow(), - body: transformed.body, - ommers: transformed.ommers.into_iter().map(Header::seal_slow).collect(), - withdrawals: transformed.withdrawals, - } - } + let (result_tx, _result_rx) = oneshot::channel(); + handle.send_message(EngineApiMessage::NewPayload(SealedBlock::default().into(), result_tx)); + assert_matches!( + handle.engine_rx.recv().await, + Some(BeaconEngineMessage::NewPayload { .. }) + ); - #[tokio::test] - async fn payload_validation() { - let (_, api) = setup_engine_api(); - - let block = random_block(100, Some(H256::random()), Some(3), Some(0)); - - // Valid extra data - let block_with_valid_extra_data = transform_block(block.clone(), |mut b| { - b.header.extra_data = BytesMut::zeroed(32).freeze().into(); - b - }); - assert_matches!(api.try_construct_block(block_with_valid_extra_data.into()), Ok(_)); - - // Invalid extra data - let block_with_invalid_extra_data: Bytes = BytesMut::zeroed(33).freeze(); - let invalid_extra_data_block = transform_block(block.clone(), |mut b| { - b.header.extra_data = block_with_invalid_extra_data.clone().into(); - b - }); - assert_matches!( - api.try_construct_block(invalid_extra_data_block.into()), - Err(EngineApiError::PayloadExtraData(data)) if data == block_with_invalid_extra_data - ); - - // Zero base fee - let block_with_zero_base_fee = transform_block(block.clone(), |mut b| { - b.header.base_fee_per_gas = Some(0); - b - }); - assert_matches!( - api.try_construct_block(block_with_zero_base_fee.into()), - Err(EngineApiError::PayloadBaseFee(val)) if val == U256::ZERO - ); - - // Invalid encoded transactions - let mut payload_with_invalid_txs: ExecutionPayload = block.clone().into(); - payload_with_invalid_txs.transactions.iter_mut().for_each(|tx| { - *tx = Bytes::new().into(); - }); - assert_matches!( - api.try_construct_block(payload_with_invalid_txs), - Err(EngineApiError::Decode(DecodeError::InputTooShort)) - ); - - // Non empty ommers - let block_with_ommers = transform_block(block.clone(), |mut b| { - b.ommers.push(random_header(100, None).unseal()); - b - }); - assert_matches!( - api.try_construct_block(block_with_ommers.clone().into()), - Err(EngineApiError::PayloadBlockHash { consensus, .. }) - if consensus == block_with_ommers.hash() - ); - - // None zero difficulty - let block_with_difficulty = transform_block(block.clone(), |mut b| { - b.header.difficulty = U256::from(1); - b - }); - assert_matches!( - api.try_construct_block(block_with_difficulty.clone().into()), - Err(EngineApiError::PayloadBlockHash { consensus, .. }) - if consensus == block_with_difficulty.hash() - ); - - // None zero nonce - let block_with_nonce = transform_block(block.clone(), |mut b| { - b.header.nonce = 1; - b - }); - assert_matches!( - api.try_construct_block(block_with_nonce.clone().into()), - Err(EngineApiError::PayloadBlockHash { consensus, .. }) - if consensus == block_with_nonce.hash() - ); - - // Valid block - let valid_block = block; - assert_matches!(api.try_construct_block(valid_block.into()), Ok(_)); - } - - #[tokio::test] - async fn payload_known() { - let (handle, api) = setup_engine_api(); - tokio::spawn(api); - - let block = random_block(100, Some(H256::random()), None, Some(0)); // payload must have no ommers - let block_hash = block.hash(); - let execution_payload = block.clone().into(); - - handle.client.add_header(block_hash, block.header.unseal()); - - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::NewPayload( - EngineApiMessageVersion::V1, - execution_payload, - result_tx, - )); - - let expected_result = PayloadStatus::new(PayloadStatusEnum::Valid, block_hash); - assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - } - - #[tokio::test] - async fn payload_parent_unknown() { - let (handle, api) = setup_engine_api(); - tokio::spawn(api); - - let (result_tx, result_rx) = oneshot::channel(); - let block = random_block(100, Some(H256::random()), None, Some(0)); // payload must have no ommers - handle.send_message(EngineApiMessage::NewPayload( - EngineApiMessageVersion::V1, - block.into(), - result_tx, - )); - - let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Syncing); - assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - } - - #[tokio::test] - async fn payload_pre_merge() { - let (handle, api) = setup_engine_api(); - tokio::spawn(api); - - let parent = transform_block(random_block(100, None, None, Some(0)), |mut b| { - b.header.difficulty = - handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap() - U256::from(1); - b - }); - let block = random_block(101, Some(parent.hash()), None, Some(0)); - - handle.client.add_block(parent.hash(), parent.clone().unseal()); - - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::NewPayload( - EngineApiMessageVersion::V1, - block.clone().into(), - result_tx, - )); - - let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid { - validation_error: EngineApiError::PayloadPreMerge.to_string(), - }) - .with_latest_valid_hash(H256::zero()); - assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - } - - #[tokio::test] - async fn invalid_payload_timestamp() { - let (handle, api) = setup_engine_api(); - tokio::spawn(api); - - let block_timestamp = 100; - let parent_timestamp = block_timestamp + 10; - let parent = transform_block(random_block(100, None, None, Some(0)), |mut b| { - b.header.timestamp = parent_timestamp; - b.header.difficulty = - handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap() + U256::from(1); - b - }); - let block = - transform_block(random_block(101, Some(parent.hash()), None, Some(0)), |mut b| { - b.header.timestamp = block_timestamp; - b - }); - - handle.client.add_block(parent.hash(), parent.clone().unseal()); - - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::NewPayload( - EngineApiMessageVersion::V1, - block.clone().into(), - result_tx, - )); - - let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid { - validation_error: EngineApiError::PayloadTimestamp { - invalid: block_timestamp, - latest: parent_timestamp, - } - .to_string(), - }); - assert_matches!( result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - } - - // TODO: add execution tests - } - - // non exhaustive tests for engine_getPayload - // TODO: amend when block building is implemented - mod get_payload { - use super::*; - - #[tokio::test] - async fn payload_unknown() { - let (handle, api) = setup_engine_api(); - tokio::spawn(api); - - let payload_id = H64::random(); - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::GetPayload(payload_id, result_tx)); - - assert_matches!(result_rx.await, Ok(Err(EngineApiError::PayloadUnknown))); - } + let (result_tx, _result_rx) = oneshot::channel(); + handle.send_message(EngineApiMessage::ForkchoiceUpdated( + ForkchoiceState::default(), + None, + result_tx, + )); + assert_matches!( + handle.engine_rx.recv().await, + Some(BeaconEngineMessage::ForkchoiceUpdated { .. }) + ); } // tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash` @@ -892,177 +391,10 @@ mod tests { } } - mod fork_choice_updated { - use super::*; - use reth_interfaces::test_utils::generators::random_header; - - #[tokio::test] - async fn empty_head() { - let (handle, api) = setup_engine_api(); - tokio::spawn(api); - - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::ForkchoiceUpdated( - EngineApiMessageVersion::V1, - ForkchoiceState::default(), - None, - result_tx, - )); - - let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { - validation_error: EngineApiError::ForkchoiceEmptyHead.to_string(), - }); - assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - assert!(!handle.forkchoice_state_has_changed()); - } - - #[tokio::test] - async fn unknown_head_hash() { - let (handle, api) = setup_engine_api(); - tokio::spawn(api); - - let state = ForkchoiceState { head_block_hash: H256::random(), ..Default::default() }; - - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::ForkchoiceUpdated( - EngineApiMessageVersion::V1, - state, - None, - result_tx, - )); - - let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); - assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - assert!(!handle.forkchoice_state_has_changed()); - } - - #[tokio::test] - async fn unknown_finalized_hash() { - let (handle, api) = setup_engine_api(); - tokio::spawn(api); - - let head = random_header(100, None); - handle.client.add_header(head.hash(), head.clone().unseal()); - - let state = ForkchoiceState { - head_block_hash: head.hash(), - finalized_block_hash: H256::random(), - ..Default::default() - }; - - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::ForkchoiceUpdated( - EngineApiMessageVersion::V1, - state, - None, - result_tx, - )); - - let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); - assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - assert!(!handle.forkchoice_state_has_changed()); - } - - #[tokio::test] - async fn forkchoice_state_is_updated() { - let (handle, api) = setup_engine_api(); - tokio::spawn(api); - - let ttd = handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap(); - let finalized = random_header(90, None); - let mut head = random_header(100, None).unseal(); - - // set the difficulty so we know it is post-merge - head.difficulty = ttd; - let head = head.seal_slow(); - handle.client.extend_headers([ - (head.hash(), head.clone().unseal()), - (finalized.hash(), finalized.clone().unseal()), - ]); - - let state = ForkchoiceState { - head_block_hash: head.hash(), - finalized_block_hash: finalized.hash(), - ..Default::default() - }; - - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::ForkchoiceUpdated( - EngineApiMessageVersion::V1, - state.clone(), - None, - result_tx, - )); - - let expected_result = ForkchoiceUpdated { - payload_id: None, - payload_status: PayloadStatus { - status: PayloadStatusEnum::Valid, - latest_valid_hash: Some(head.hash()), - }, - }; - assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - - assert!(handle.forkchoice_state_has_changed()); - assert_eq!(handle.forkchoice_state(), state); - } - - #[tokio::test] - async fn forkchoice_updated_invalid_pow() { - let (handle, api) = setup_engine_api(); - tokio::spawn(api); - - let finalized = random_header(90, None); - let mut head = random_header(100, None).unseal(); - - // ensure we don't mess up when subtracting just in case - let ttd = handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap(); - assert!(ttd > finalized.difficulty); - - // set the difficulty so we know it is post-merge - head.difficulty = ttd - U256::from(1) - finalized.difficulty; - let head = head.seal_slow(); - handle.client.extend_headers([ - (head.hash(), head.clone().unseal()), - (finalized.hash(), finalized.clone().unseal()), - ]); - - let state = ForkchoiceState { - head_block_hash: head.hash(), - finalized_block_hash: finalized.hash(), - ..Default::default() - }; - - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::ForkchoiceUpdated( - EngineApiMessageVersion::V1, - state.clone(), - None, - result_tx, - )); - - let expected_result = ForkchoiceUpdated { - payload_id: None, - payload_status: PayloadStatus { - status: PayloadStatusEnum::Invalid { - validation_error: EngineApiError::PayloadPreMerge.to_string(), - }, - latest_valid_hash: Some(H256::zero()), - }, - }; - assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - - // From the engine API spec: - // - // Additionally, if this validation fails, client software MUST NOT update the - // forkchoice state and MUST NOT begin a payload build process. - assert!(!handle.forkchoice_state_has_changed()); - } - } - // https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-3 mod exchange_transition_configuration { use super::*; + use reth_primitives::U256; #[tokio::test] async fn terminal_td_mismatch() { @@ -1081,11 +413,11 @@ mod tests { result_tx, )); - let expected_error = EngineApiError::TerminalTD { - execution: handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap(), - consensus: U256::from(transition_config.terminal_total_difficulty), - }; - assert_matches!(result_rx.await, Ok(Err(error)) => assert_eq!(error, expected_error)); + assert_matches!( + result_rx.await, + Ok(Err(EngineApiError::TerminalTD { execution, consensus })) + if execution == handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap() && consensus == U256::from(transition_config.terminal_total_difficulty) + ); } #[tokio::test] @@ -1109,12 +441,11 @@ mod tests { transition_config.clone(), result_tx, )); - - let expected_error = EngineApiError::TerminalBlockHash { - execution: None, - consensus: transition_config.terminal_block_hash, - }; - assert_matches!(result_rx.await, Ok(Err(error)) => assert_eq!(error, expected_error)); + assert_matches!( + result_rx.await, + Ok(Err(EngineApiError::TerminalBlockHash { execution, consensus })) + if execution == None && consensus == transition_config.terminal_block_hash + ); // Add block and to provider local store and test for mismatch handle.client.add_block( @@ -1128,11 +459,11 @@ mod tests { result_tx, )); - let expected_error = EngineApiError::TerminalBlockHash { - execution: Some(execution_terminal_block.hash()), - consensus: transition_config.terminal_block_hash, - }; - assert_matches!(result_rx.await, Ok(Err(error)) => assert_eq!(error, expected_error)); + assert_matches!( + result_rx.await, + Ok(Err(EngineApiError::TerminalBlockHash { execution, consensus })) + if execution == Some(execution_terminal_block.hash()) && consensus == transition_config.terminal_block_hash + ); } #[tokio::test] diff --git a/crates/rpc/rpc-engine-api/src/error.rs b/crates/rpc/rpc-engine-api/src/error.rs index 49c7c1afc..2a3128463 100644 --- a/crates/rpc/rpc-engine-api/src/error.rs +++ b/crates/rpc/rpc-engine-api/src/error.rs @@ -1,4 +1,5 @@ -use reth_primitives::{Bytes, H256, U256}; +use reth_beacon_consensus::BeaconEngineError; +use reth_primitives::{H256, U256}; use thiserror::Error; /// The Engine API result type @@ -10,39 +11,8 @@ pub const UNKNOWN_PAYLOAD_CODE: i32 = -38001; pub const REQUEST_TOO_LARGE_CODE: i32 = -38004; /// Error returned by [`EngineApi`][crate::EngineApi] -#[derive(Error, PartialEq, Debug)] +#[derive(Error, Debug)] pub enum EngineApiError { - /// Invalid payload extra data. - #[error("Invalid payload extra data: {0}")] - PayloadExtraData(Bytes), - /// Invalid payload base fee. - #[error("Invalid payload base fee: {0}")] - PayloadBaseFee(U256), - /// Invalid payload block hash. - #[error("Invalid payload block hash. Execution: {execution}. Consensus: {consensus}")] - PayloadBlockHash { - /// The block hash computed from the payload. - execution: H256, - /// The block hash provided with the payload. - consensus: H256, - }, - /// Invalid payload block hash. - #[error("Invalid payload timestamp: {invalid}. Latest: {latest}")] - PayloadTimestamp { - /// The payload timestamp. - invalid: u64, - /// Latest available timestamp. - latest: u64, - }, - /// Failed to recover transaction signer. - #[error("Failed to recover signer for payload transaction: {hash:?}")] - PayloadSignerRecovery { - /// The hash of the failed transaction - hash: H256, - }, - /// Received pre-merge payload. - #[error("Received pre-merge payload.")] - PayloadPreMerge, /// Unknown payload requested. #[error("Unknown payload")] PayloadUnknown, @@ -75,16 +45,10 @@ pub enum EngineApiError { /// Consensus terminal block hash. consensus: H256, }, - /// Forkchoice zero hash head received. - #[error("Received zero hash as forkchoice head")] - ForkchoiceEmptyHead, - /// Chain spec merge terminal total difficulty is not set - #[error("The merge terminal total difficulty is not known")] - UnknownMergeTerminalTotalDifficulty, - /// Encountered decoding error. + /// Beacon consensus engine error. #[error(transparent)] - Decode(#[from] reth_rlp::DecodeError), - /// API encountered an internal error. + ConsensusEngine(#[from] BeaconEngineError), + /// Encountered an internal error. #[error(transparent)] - Internal(#[from] reth_interfaces::Error), + Internal(Box), } diff --git a/crates/rpc/rpc-engine-api/src/message.rs b/crates/rpc/rpc-engine-api/src/message.rs index 2f6707ced..9bc7ce11e 100644 --- a/crates/rpc/rpc-engine-api/src/message.rs +++ b/crates/rpc/rpc-engine-api/src/message.rs @@ -1,4 +1,5 @@ use crate::EngineApiSender; +use reth_beacon_consensus::BeaconEngineSender; use reth_interfaces::consensus::ForkchoiceState; use reth_primitives::{BlockHash, BlockNumber, H64}; use reth_rpc_types::engine::{ @@ -9,26 +10,25 @@ use reth_rpc_types::engine::{ /// Message type for communicating with [`EngineApi`][crate::EngineApi]. #[derive(Debug)] pub enum EngineApiMessage { - /// New payload message - NewPayload(EngineApiMessageVersion, ExecutionPayload, EngineApiSender), /// Get payload message GetPayload(H64, EngineApiSender), /// Get payload bodies by range message GetPayloadBodiesByRange(BlockNumber, u64, EngineApiSender), /// Get payload bodies by hash message GetPayloadBodiesByHash(Vec, EngineApiSender), - /// Forkchoice updated message - ForkchoiceUpdated( - EngineApiMessageVersion, - ForkchoiceState, - Option, - EngineApiSender, - ), /// Exchange transition configuration message ExchangeTransitionConfiguration( TransitionConfiguration, EngineApiSender, ), + /// New payload message + NewPayload(ExecutionPayload, BeaconEngineSender), + /// Forkchoice updated message + ForkchoiceUpdated( + ForkchoiceState, + Option, + BeaconEngineSender, + ), } /// The version of Engine API message. diff --git a/crates/rpc/rpc-types/Cargo.toml b/crates/rpc/rpc-types/Cargo.toml index dcf39ef79..870ff1d77 100644 --- a/crates/rpc/rpc-types/Cargo.toml +++ b/crates/rpc/rpc-types/Cargo.toml @@ -8,6 +8,7 @@ readme = "README.md" description = """ Reth RPC types """ + [dependencies] # reth reth-primitives = { path = "../../primitives" } @@ -27,5 +28,9 @@ jsonrpsee-types = { version = "0.16" } lru = "0.9" [dev-dependencies] -rand = "0.8" +# reth reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } + +# misc +rand = "0.8" +assert_matches = "1.5" \ No newline at end of file diff --git a/crates/rpc/rpc-types/src/eth/engine.rs b/crates/rpc/rpc-types/src/eth/engine.rs deleted file mode 100644 index 16e67d803..000000000 --- a/crates/rpc/rpc-types/src/eth/engine.rs +++ /dev/null @@ -1,235 +0,0 @@ -//! Engine API types: and following the execution specs - -#![allow(missing_docs)] - -use reth_primitives::{ - Address, Block, Bloom, Bytes, SealedBlock, Withdrawal, H256, H64, U256, U64, -}; -use reth_rlp::Encodable; -use serde::{Deserialize, Serialize}; - -/// The list of supported Engine capabilities -pub const CAPABILITIES: [&str; 9] = [ - "engine_forkchoiceUpdatedV1", - "engine_forkchoiceUpdatedV2", - "engine_exchangeTransitionConfigurationV1", - "engine_getPayloadV1", - "engine_getPayloadV2", - "engine_newPayloadV1", - "engine_newPayloadV2", - "engine_getPayloadBodiesByHashV1", - "engine_getPayloadBodiesByRangeV1", -]; - -/// This structure maps on the ExecutionPayload structure of the beacon chain spec. -/// -/// See also: -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ExecutionPayload { - pub parent_hash: H256, - pub fee_recipient: Address, - pub state_root: H256, - pub receipts_root: H256, - pub logs_bloom: Bloom, - pub prev_randao: H256, - pub block_number: U64, - pub gas_limit: U64, - pub gas_used: U64, - pub timestamp: U64, - pub extra_data: Bytes, - pub base_fee_per_gas: U256, - pub block_hash: H256, - pub transactions: Vec, - /// Array of [`Withdrawal`] enabled with V2 - /// See - #[serde(default, skip_serializing_if = "Option::is_none")] - pub withdrawals: Option>, -} - -impl From for ExecutionPayload { - fn from(value: SealedBlock) -> Self { - let transactions = value - .body - .iter() - .map(|tx| { - let mut encoded = Vec::new(); - tx.encode(&mut encoded); - encoded.into() - }) - .collect(); - ExecutionPayload { - parent_hash: value.parent_hash, - fee_recipient: value.beneficiary, - state_root: value.state_root, - receipts_root: value.receipts_root, - logs_bloom: value.logs_bloom, - prev_randao: value.mix_hash, - block_number: value.number.into(), - gas_limit: value.gas_limit.into(), - gas_used: value.gas_used.into(), - timestamp: value.timestamp.into(), - extra_data: value.extra_data.clone(), - base_fee_per_gas: U256::from(value.base_fee_per_gas.unwrap_or_default()), - block_hash: value.hash(), - transactions, - withdrawals: value.withdrawals, - } - } -} - -/// This structure contains a body of an execution payload. -/// -/// See also: -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct ExecutionPayloadBody { - pub transactions: Vec, - pub withdrawals: Vec, -} - -impl From for ExecutionPayloadBody { - fn from(value: Block) -> Self { - let transactions = value.body.into_iter().map(|tx| { - let mut out = Vec::new(); - tx.encode(&mut out); - out.into() - }); - ExecutionPayloadBody { - transactions: transactions.collect(), - withdrawals: value.withdrawals.unwrap_or_default(), - } - } -} - -/// The execution payload body response that allows for `null` values. -pub type ExecutionPayloadBodies = Vec>; - -/// This structure encapsulates the fork choice state -#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ForkchoiceState { - pub head_block_hash: H256, - pub safe_block_hash: H256, - pub finalized_block_hash: H256, -} - -/// This structure contains the attributes required to initiate a payload build process in the -/// context of an `engine_forkchoiceUpdated` call. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct PayloadAttributes { - pub timestamp: U64, - pub prev_randao: H256, - pub suggested_fee_recipient: Address, - /// Array of [`Withdrawal`] enabled with V2 - /// See - #[serde(default, skip_serializing_if = "Option::is_none")] - pub withdrawals: Option>, -} - -/// This structure contains the result of processing a payload -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct PayloadStatus { - #[serde(flatten)] - pub status: PayloadStatusEnum, - /// Hash of the most recent valid block in the branch defined by payload and its ancestors - pub latest_valid_hash: Option, -} - -impl PayloadStatus { - pub fn new(status: PayloadStatusEnum, latest_valid_hash: H256) -> Self { - Self { status, latest_valid_hash: Some(latest_valid_hash) } - } - - pub fn from_status(status: PayloadStatusEnum) -> Self { - Self { status, latest_valid_hash: None } - } - - pub fn with_latest_valid_hash(mut self, latest_valid_hash: H256) -> Self { - self.latest_valid_hash = Some(latest_valid_hash); - self - } -} - -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(tag = "status", rename_all = "SCREAMING_SNAKE_CASE")] -pub enum PayloadStatusEnum { - Valid, - Invalid { - #[serde(rename = "validationError")] - validation_error: String, - }, - Syncing, - Accepted, - InvalidBlockHash { - #[serde(rename = "validationError")] - validation_error: String, - }, -} - -/// This structure contains configurable settings of the transition process. -#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct TransitionConfiguration { - /// Maps on the TERMINAL_TOTAL_DIFFICULTY parameter of EIP-3675 - pub terminal_total_difficulty: U256, - /// Maps on TERMINAL_BLOCK_HASH parameter of EIP-3675 - pub terminal_block_hash: H256, - /// Maps on TERMINAL_BLOCK_NUMBER parameter of EIP-3675 - pub terminal_block_number: U64, -} - -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ForkchoiceUpdated { - pub payload_status: PayloadStatus, - pub payload_id: Option, -} - -impl ForkchoiceUpdated { - pub fn new(payload_status: PayloadStatus) -> Self { - Self { payload_status, payload_id: None } - } - - pub fn from_status(status: PayloadStatusEnum) -> Self { - Self { payload_status: PayloadStatus::from_status(status), payload_id: None } - } - - pub fn with_latest_valid_hash(mut self, hash: H256) -> Self { - self.payload_status.latest_valid_hash = Some(hash); - self - } - - pub fn with_payload_id(mut self, id: H64) -> Self { - self.payload_id = Some(id); - self - } -} - -#[cfg(test)] -mod tests { - use super::*; - use reth_interfaces::test_utils::generators::random_block_range; - use reth_primitives::{TransactionSigned, H256}; - use reth_rlp::Decodable; - - #[test] - fn payload_body_roundtrip() { - for block in random_block_range(0..100, H256::default(), 0..2) { - let unsealed = block.clone().unseal(); - let payload_body: ExecutionPayloadBody = unsealed.into(); - - assert_eq!( - Ok(block.body), - payload_body - .transactions - .iter() - .map(|x| TransactionSigned::decode(&mut &x[..])) - .collect::, _>>(), - ); - - assert_eq!(block.withdrawals.unwrap_or_default(), payload_body.withdrawals); - } - } -} diff --git a/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs b/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs new file mode 100644 index 000000000..ef251f671 --- /dev/null +++ b/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs @@ -0,0 +1,39 @@ +use super::{PayloadStatus, PayloadStatusEnum}; +use reth_primitives::{H256, H64}; +use serde::{Deserialize, Serialize}; + +/// This structure encapsulates the fork choice state +#[derive(Default, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ForkchoiceState { + pub head_block_hash: H256, + pub safe_block_hash: H256, + pub finalized_block_hash: H256, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ForkchoiceUpdated { + pub payload_status: PayloadStatus, + pub payload_id: Option, +} + +impl ForkchoiceUpdated { + pub fn new(payload_status: PayloadStatus) -> Self { + Self { payload_status, payload_id: None } + } + + pub fn from_status(status: PayloadStatusEnum) -> Self { + Self { payload_status: PayloadStatus::from_status(status), payload_id: None } + } + + pub fn with_latest_valid_hash(mut self, hash: H256) -> Self { + self.payload_status.latest_valid_hash = Some(hash); + self + } + + pub fn with_payload_id(mut self, id: H64) -> Self { + self.payload_id = Some(id); + self + } +} diff --git a/crates/rpc/rpc-types/src/eth/engine/mod.rs b/crates/rpc/rpc-types/src/eth/engine/mod.rs new file mode 100644 index 000000000..2a814374f --- /dev/null +++ b/crates/rpc/rpc-types/src/eth/engine/mod.rs @@ -0,0 +1,22 @@ +//! Engine API types: and following the execution specs + +#![allow(missing_docs)] + +mod forkchoice; +mod payload; +mod transition; + +pub use self::{forkchoice::*, payload::*, transition::*}; + +/// The list of supported Engine capabilities +pub const CAPABILITIES: [&str; 9] = [ + "engine_forkchoiceUpdatedV1", + "engine_forkchoiceUpdatedV2", + "engine_exchangeTransitionConfigurationV1", + "engine_getPayloadV1", + "engine_getPayloadV2", + "engine_newPayloadV1", + "engine_newPayloadV2", + "engine_getPayloadBodiesByHashV1", + "engine_getPayloadBodiesByRangeV1", +]; diff --git a/crates/rpc/rpc-types/src/eth/engine/payload.rs b/crates/rpc/rpc-types/src/eth/engine/payload.rs new file mode 100644 index 000000000..15dd99bbe --- /dev/null +++ b/crates/rpc/rpc-types/src/eth/engine/payload.rs @@ -0,0 +1,361 @@ +use reth_primitives::{ + proofs::{self, EMPTY_LIST_HASH}, + Address, Block, Bloom, Bytes, Header, SealedBlock, TransactionSigned, Withdrawal, H256, U256, + U64, +}; +use reth_rlp::{Decodable, Encodable}; +use serde::{Deserialize, Serialize}; + +/// This structure maps on the ExecutionPayload structure of the beacon chain spec. +/// +/// See also: +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ExecutionPayload { + pub parent_hash: H256, + pub fee_recipient: Address, + pub state_root: H256, + pub receipts_root: H256, + pub logs_bloom: Bloom, + pub prev_randao: H256, + pub block_number: U64, + pub gas_limit: U64, + pub gas_used: U64, + pub timestamp: U64, + pub extra_data: Bytes, + pub base_fee_per_gas: U256, + pub block_hash: H256, + pub transactions: Vec, + /// Array of [`Withdrawal`] enabled with V2 + /// See + #[serde(default, skip_serializing_if = "Option::is_none")] + pub withdrawals: Option>, +} + +impl From for ExecutionPayload { + fn from(value: SealedBlock) -> Self { + let transactions = value + .body + .iter() + .map(|tx| { + let mut encoded = Vec::new(); + tx.encode(&mut encoded); + encoded.into() + }) + .collect(); + ExecutionPayload { + parent_hash: value.parent_hash, + fee_recipient: value.beneficiary, + state_root: value.state_root, + receipts_root: value.receipts_root, + logs_bloom: value.logs_bloom, + prev_randao: value.mix_hash, + block_number: value.number.into(), + gas_limit: value.gas_limit.into(), + gas_used: value.gas_used.into(), + timestamp: value.timestamp.into(), + extra_data: value.extra_data.clone(), + base_fee_per_gas: U256::from(value.base_fee_per_gas.unwrap_or_default()), + block_hash: value.hash(), + transactions, + withdrawals: value.withdrawals, + } + } +} + +/// Try to construct a block from given payload. Perform addition validation of `extra_data` and +/// `base_fee_per_gas` fields. +/// +/// NOTE: The log bloom is assumed to be validated during serialization. +/// NOTE: Empty ommers, nonce and difficulty values are validated upon computing block hash and +/// comparing the value with `payload.block_hash`. +/// +/// See +impl TryFrom for SealedBlock { + type Error = PayloadError; + + fn try_from(payload: ExecutionPayload) -> Result { + if payload.extra_data.len() > 32 { + return Err(PayloadError::ExtraData(payload.extra_data)) + } + + if payload.base_fee_per_gas == U256::ZERO { + return Err(PayloadError::BaseFee(payload.base_fee_per_gas)) + } + + let transactions = payload + .transactions + .iter() + .map(|tx| TransactionSigned::decode(&mut tx.as_ref())) + .collect::, _>>()?; + let transactions_root = proofs::calculate_transaction_root(transactions.iter()); + + let withdrawals_root = + payload.withdrawals.as_ref().map(|w| proofs::calculate_withdrawals_root(w.iter())); + + let header = Header { + parent_hash: payload.parent_hash, + beneficiary: payload.fee_recipient, + state_root: payload.state_root, + transactions_root, + receipts_root: payload.receipts_root, + withdrawals_root, + logs_bloom: payload.logs_bloom, + number: payload.block_number.as_u64(), + gas_limit: payload.gas_limit.as_u64(), + gas_used: payload.gas_used.as_u64(), + timestamp: payload.timestamp.as_u64(), + mix_hash: payload.prev_randao, + base_fee_per_gas: Some(payload.base_fee_per_gas.to::()), + extra_data: payload.extra_data, + // Defaults + ommers_hash: EMPTY_LIST_HASH, + difficulty: Default::default(), + nonce: Default::default(), + } + .seal_slow(); + + if payload.block_hash != header.hash() { + return Err(PayloadError::BlockHash { + execution: header.hash(), + consensus: payload.block_hash, + }) + } + + Ok(SealedBlock { + header, + body: transactions, + withdrawals: payload.withdrawals, + ommers: Default::default(), + }) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum PayloadError { + /// Invalid payload extra data. + #[error("Invalid payload extra data: {0}")] + ExtraData(Bytes), + /// Invalid payload base fee. + #[error("Invalid payload base fee: {0}")] + BaseFee(U256), + /// Invalid payload block hash. + #[error("Invalid payload block hash. Execution: {execution}. Consensus: {consensus}")] + BlockHash { + /// The block hash computed from the payload. + execution: H256, + /// The block hash provided with the payload. + consensus: H256, + }, + /// Encountered decoding error. + #[error(transparent)] + Decode(#[from] reth_rlp::DecodeError), +} + +/// This structure contains a body of an execution payload. +/// +/// See also: +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ExecutionPayloadBody { + pub transactions: Vec, + pub withdrawals: Vec, +} + +impl From for ExecutionPayloadBody { + fn from(value: Block) -> Self { + let transactions = value.body.into_iter().map(|tx| { + let mut out = Vec::new(); + tx.encode(&mut out); + out.into() + }); + ExecutionPayloadBody { + transactions: transactions.collect(), + withdrawals: value.withdrawals.unwrap_or_default(), + } + } +} + +/// The execution payload body response that allows for `null` values. +pub type ExecutionPayloadBodies = Vec>; + +/// This structure contains the attributes required to initiate a payload build process in the +/// context of an `engine_forkchoiceUpdated` call. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PayloadAttributes { + pub timestamp: U64, + pub prev_randao: H256, + pub suggested_fee_recipient: Address, + /// Array of [`Withdrawal`] enabled with V2 + /// See + #[serde(default, skip_serializing_if = "Option::is_none")] + pub withdrawals: Option>, +} + +/// This structure contains the result of processing a payload +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PayloadStatus { + #[serde(flatten)] + pub status: PayloadStatusEnum, + /// Hash of the most recent valid block in the branch defined by payload and its ancestors + pub latest_valid_hash: Option, +} + +impl PayloadStatus { + pub fn new(status: PayloadStatusEnum, latest_valid_hash: Option) -> Self { + Self { status, latest_valid_hash } + } + + pub fn from_status(status: PayloadStatusEnum) -> Self { + Self { status, latest_valid_hash: None } + } + + pub fn with_latest_valid_hash(mut self, latest_valid_hash: H256) -> Self { + self.latest_valid_hash = Some(latest_valid_hash); + self + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "status", rename_all = "SCREAMING_SNAKE_CASE")] +pub enum PayloadStatusEnum { + Valid, + Invalid { + #[serde(rename = "validationError")] + validation_error: String, + }, + Syncing, + Accepted, + InvalidBlockHash { + #[serde(rename = "validationError")] + validation_error: String, + }, +} + +#[cfg(test)] +mod tests { + use super::*; + use assert_matches::assert_matches; + use reth_interfaces::test_utils::generators::{ + random_block, random_block_range, random_header, + }; + use reth_primitives::{ + bytes::{Bytes, BytesMut}, + TransactionSigned, H256, + }; + use reth_rlp::{Decodable, DecodeError}; + + fn transform_block Block>(src: SealedBlock, f: F) -> ExecutionPayload { + let unsealed = src.unseal(); + let mut transformed: Block = f(unsealed); + // Recalculate roots + transformed.header.transactions_root = + proofs::calculate_transaction_root(transformed.body.iter()); + transformed.header.ommers_hash = proofs::calculate_ommers_root(transformed.ommers.iter()); + SealedBlock { + header: transformed.header.seal_slow(), + body: transformed.body, + ommers: transformed.ommers.into_iter().map(Header::seal_slow).collect(), + withdrawals: transformed.withdrawals, + } + .into() + } + + #[test] + fn payload_body_roundtrip() { + for block in random_block_range(0..100, H256::default(), 0..2) { + let unsealed = block.clone().unseal(); + let payload_body: ExecutionPayloadBody = unsealed.into(); + + assert_eq!( + Ok(block.body), + payload_body + .transactions + .iter() + .map(|x| TransactionSigned::decode(&mut &x[..])) + .collect::, _>>(), + ); + + assert_eq!(block.withdrawals.unwrap_or_default(), payload_body.withdrawals); + } + } + + #[test] + fn payload_validation() { + let block = random_block(100, Some(H256::random()), Some(3), Some(0)); + + // Valid extra data + let block_with_valid_extra_data = transform_block(block.clone(), |mut b| { + b.header.extra_data = BytesMut::zeroed(32).freeze().into(); + b + }); + assert_matches!(TryInto::::try_into(block_with_valid_extra_data), Ok(_)); + + // Invalid extra data + let block_with_invalid_extra_data: Bytes = BytesMut::zeroed(33).freeze().into(); + let invalid_extra_data_block = transform_block(block.clone(), |mut b| { + b.header.extra_data = block_with_invalid_extra_data.clone().into(); + b + }); + assert_matches!( + TryInto::::try_into(invalid_extra_data_block), + Err(PayloadError::ExtraData(data)) if data == block_with_invalid_extra_data + ); + + // Zero base fee + let block_with_zero_base_fee = transform_block(block.clone(), |mut b| { + b.header.base_fee_per_gas = Some(0); + b + }); + assert_matches!( + TryInto::::try_into(block_with_zero_base_fee), + Err(PayloadError::BaseFee(val)) if val == U256::ZERO + ); + + // Invalid encoded transactions + let mut payload_with_invalid_txs: ExecutionPayload = block.clone().into(); + payload_with_invalid_txs.transactions.iter_mut().for_each(|tx| { + *tx = Bytes::new().into(); + }); + assert_matches!( + TryInto::::try_into(payload_with_invalid_txs), + Err(PayloadError::Decode(DecodeError::InputTooShort)) + ); + + // Non empty ommers + let block_with_ommers = transform_block(block.clone(), |mut b| { + b.ommers.push(random_header(100, None).unseal()); + b + }); + assert_matches!( + TryInto::::try_into(block_with_ommers.clone()), + Err(PayloadError::BlockHash { consensus, .. }) + if consensus == block_with_ommers.block_hash + ); + + // None zero difficulty + let block_with_difficulty = transform_block(block.clone(), |mut b| { + b.header.difficulty = U256::from(1); + b + }); + assert_matches!( + TryInto::::try_into(block_with_difficulty.clone()), + Err(PayloadError::BlockHash { consensus, .. }) if consensus == block_with_difficulty.block_hash + ); + + // None zero nonce + let block_with_nonce = transform_block(block.clone(), |mut b| { + b.header.nonce = 1; + b + }); + assert_matches!( + TryInto::::try_into(block_with_nonce.clone()), + Err(PayloadError::BlockHash { consensus, .. }) if consensus == block_with_nonce.block_hash + ); + + // Valid block + let valid_block = block; + assert_matches!(TryInto::::try_into(valid_block), Ok(_)); + } +} diff --git a/crates/rpc/rpc-types/src/eth/engine/transition.rs b/crates/rpc/rpc-types/src/eth/engine/transition.rs new file mode 100644 index 000000000..b4addac95 --- /dev/null +++ b/crates/rpc/rpc-types/src/eth/engine/transition.rs @@ -0,0 +1,14 @@ +use reth_primitives::{H256, U256, U64}; +use serde::{Deserialize, Serialize}; + +/// This structure contains configurable settings of the transition process. +#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransitionConfiguration { + /// Maps on the TERMINAL_TOTAL_DIFFICULTY parameter of EIP-3675 + pub terminal_total_difficulty: U256, + /// Maps on TERMINAL_BLOCK_HASH parameter of EIP-3675 + pub terminal_block_hash: H256, + /// Maps on TERMINAL_BLOCK_NUMBER parameter of EIP-3675 + pub terminal_block_number: U64, +} diff --git a/crates/rpc/rpc/src/engine/mod.rs b/crates/rpc/rpc/src/engine.rs similarity index 64% rename from crates/rpc/rpc/src/engine/mod.rs rename to crates/rpc/rpc/src/engine.rs index 1b943fd07..7715ba535 100644 --- a/crates/rpc/rpc/src/engine/mod.rs +++ b/crates/rpc/rpc/src/engine.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::result::rpc_err; use async_trait::async_trait; use jsonrpsee::{ @@ -5,7 +7,7 @@ use jsonrpsee::{ types::error::INVALID_PARAMS_CODE, }; use reth_interfaces::consensus::ForkchoiceState; -use reth_primitives::{BlockHash, BlockNumber, H64}; +use reth_primitives::{BlockHash, BlockNumber, ChainSpec, Hardfork, H64}; use reth_rpc_api::EngineApiServer; use reth_rpc_engine_api::{ EngineApiError, EngineApiHandle, EngineApiMessage, EngineApiMessageVersion, EngineApiResult, @@ -17,16 +19,30 @@ use reth_rpc_types::engine::{ }; use tokio::sync::oneshot::{self, Receiver}; +fn to_rpc_error>(error: E) -> Error { + let error = error.into(); + let code = match error { + EngineApiError::InvalidParams => INVALID_PARAMS_CODE, + EngineApiError::PayloadUnknown => UNKNOWN_PAYLOAD_CODE, + EngineApiError::PayloadRequestTooLarge { .. } => REQUEST_TOO_LARGE_CODE, + // Any other server error + _ => jsonrpsee::types::error::INTERNAL_ERROR_CODE, + }; + rpc_err(code, error.to_string(), None) +} + /// The server implementation of Engine API pub struct EngineApi { - /// Handle to the consensus engine + /// Chain spec + chain_spec: Arc, + /// Handle to the engine API implementation. engine_tx: EngineApiHandle, } impl EngineApi { /// Creates a new instance of [EngineApi]. - pub fn new(engine_tx: EngineApiHandle) -> Self { - Self { engine_tx } + pub fn new(chain_spec: Arc, engine_tx: EngineApiHandle) -> Self { + Self { chain_spec, engine_tx } } } @@ -37,22 +53,42 @@ impl std::fmt::Debug for EngineApi { } impl EngineApi { - async fn delegate_request( + /// Validates the presence of the `withdrawals` field according to the payload timestamp. + /// After Shanghai, withdrawals field must be [Some]. + /// Before Shanghai, withdrawals field must be [None]; + fn validate_withdrawals_presence( + &self, + version: EngineApiMessageVersion, + timestamp: u64, + has_withdrawals: bool, + ) -> EngineApiResult<()> { + let is_shanghai = self.chain_spec.fork(Hardfork::Shanghai).active_at_timestamp(timestamp); + + match version { + EngineApiMessageVersion::V1 => { + if is_shanghai || has_withdrawals { + return Err(EngineApiError::InvalidParams) + } + } + EngineApiMessageVersion::V2 => { + let shanghai_with_no_withdrawals = is_shanghai && !has_withdrawals; + let not_shanghai_with_withdrawals = !is_shanghai && has_withdrawals; + if shanghai_with_no_withdrawals || not_shanghai_with_withdrawals { + return Err(EngineApiError::InvalidParams) + } + } + }; + + Ok(()) + } + + async fn delegate_request>( &self, msg: EngineApiMessage, - rx: Receiver>, + rx: Receiver>, ) -> Result { let _ = self.engine_tx.send(msg); - rx.await.map_err(|err| Error::Custom(err.to_string()))?.map_err(|err| { - let code = match err { - EngineApiError::InvalidParams => INVALID_PARAMS_CODE, - EngineApiError::PayloadUnknown => UNKNOWN_PAYLOAD_CODE, - EngineApiError::PayloadRequestTooLarge { .. } => REQUEST_TOO_LARGE_CODE, - // Any other server error - _ => jsonrpsee::types::error::INTERNAL_ERROR_CODE, - }; - rpc_err(code, err.to_string(), None) - }) + rx.await.map_err(|err| Error::Custom(err.to_string()))?.map_err(|err| to_rpc_error(err)) } } @@ -62,23 +98,27 @@ impl EngineApiServer for EngineApi { /// See also /// Caution: This should not accept the `withdrawals` field async fn new_payload_v1(&self, payload: ExecutionPayload) -> Result { - let (tx, rx) = oneshot::channel(); - self.delegate_request( - EngineApiMessage::NewPayload(EngineApiMessageVersion::V1, payload, tx), - rx, + self.validate_withdrawals_presence( + EngineApiMessageVersion::V1, + payload.timestamp.as_u64(), + payload.withdrawals.is_some(), ) - .await + .map_err(to_rpc_error)?; + let (tx, rx) = oneshot::channel(); + self.delegate_request(EngineApiMessage::NewPayload(payload, tx), rx).await } /// Handler for `engine_getPayloadV2` /// See also async fn new_payload_v2(&self, payload: ExecutionPayload) -> Result { - let (tx, rx) = oneshot::channel(); - self.delegate_request( - EngineApiMessage::NewPayload(EngineApiMessageVersion::V2, payload, tx), - rx, + self.validate_withdrawals_presence( + EngineApiMessageVersion::V2, + payload.timestamp.as_u64(), + payload.withdrawals.is_some(), ) - .await + .map_err(to_rpc_error)?; + let (tx, rx) = oneshot::channel(); + self.delegate_request(EngineApiMessage::NewPayload(payload, tx), rx).await } /// Handler for `engine_forkchoiceUpdatedV1` @@ -90,14 +130,17 @@ impl EngineApiServer for EngineApi { fork_choice_state: ForkchoiceState, payload_attributes: Option, ) -> Result { + if let Some(ref attrs) = payload_attributes { + self.validate_withdrawals_presence( + EngineApiMessageVersion::V1, + attrs.timestamp.as_u64(), + attrs.withdrawals.is_some(), + ) + .map_err(to_rpc_error)?; + } let (tx, rx) = oneshot::channel(); self.delegate_request( - EngineApiMessage::ForkchoiceUpdated( - EngineApiMessageVersion::V1, - fork_choice_state, - payload_attributes, - tx, - ), + EngineApiMessage::ForkchoiceUpdated(fork_choice_state, payload_attributes, tx), rx, ) .await @@ -110,14 +153,17 @@ impl EngineApiServer for EngineApi { fork_choice_state: ForkchoiceState, payload_attributes: Option, ) -> Result { + if let Some(ref attrs) = payload_attributes { + self.validate_withdrawals_presence( + EngineApiMessageVersion::V2, + attrs.timestamp.as_u64(), + attrs.withdrawals.is_some(), + ) + .map_err(to_rpc_error)?; + } let (tx, rx) = oneshot::channel(); self.delegate_request( - EngineApiMessage::ForkchoiceUpdated( - EngineApiMessageVersion::V2, - fork_choice_state, - payload_attributes, - tx, - ), + EngineApiMessage::ForkchoiceUpdated(fork_choice_state, payload_attributes, tx), rx, ) .await diff --git a/crates/stages/src/pipeline/ctrl.rs b/crates/stages/src/pipeline/ctrl.rs index 2223527f4..d250c95d0 100644 --- a/crates/stages/src/pipeline/ctrl.rs +++ b/crates/stages/src/pipeline/ctrl.rs @@ -2,7 +2,7 @@ use reth_primitives::BlockNumber; /// Determines the control flow during pipeline execution. #[derive(Debug, Eq, PartialEq)] -pub(crate) enum ControlFlow { +pub enum ControlFlow { /// An unwind was requested and must be performed before continuing. Unwind { /// The block to unwind to. @@ -22,7 +22,7 @@ pub(crate) enum ControlFlow { } impl ControlFlow { - pub(crate) fn should_continue(&self) -> bool { + pub fn should_continue(&self) -> bool { matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress { .. }) } } diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 67ed092c4..de9c2fc92 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -1,4 +1,5 @@ use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput}; +use futures_util::Future; use reth_db::database::Database; use reth_interfaces::sync::{SyncState, SyncStateUpdater}; use reth_primitives::{BlockNumber, H256}; @@ -6,6 +7,7 @@ use reth_provider::Transaction; use std::{ fmt::{Debug, Formatter}, ops::Deref, + pin::Pin, sync::Arc, }; use tokio::sync::watch; @@ -86,6 +88,13 @@ pub struct Pipeline { metrics: Metrics, } +/// The future that returns the owned pipeline and the result of the pipeline run. See +/// [Pipeline::run_as_fut]. +pub type PipelineFut = Pin> + Send>>; + +/// The pipeline type itself with the result of [Pipeline::run_as_fut] +pub type PipelineWithResult = (Pipeline, Result); + impl Default for Pipeline { fn default() -> Self { Self { @@ -109,12 +118,21 @@ impl Debug for Pipeline { } } -impl Pipeline { +impl Pipeline +where + DB: Database + 'static, + U: SyncStateUpdater + 'static, +{ /// Construct a pipeline using a [`PipelineBuilder`]. pub fn builder() -> PipelineBuilder { PipelineBuilder::default() } + /// Return the minimum pipeline progress + pub fn minimum_progress(&self) -> &Option { + &self.progress.minimum_progress + } + /// Set tip for reverse sync. pub fn set_tip(&self, tip: H256) { self.tip_tx.as_ref().expect("tip sender is set").send(tip).expect("tip channel closed"); @@ -138,13 +156,27 @@ impl Pipeline { } } + /// Consume the pipeline and run it. Return the pipeline and its result as a future. + pub fn run_as_fut(mut self, db: Arc, tip: H256) -> PipelineFut { + // TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for + // updating metrics. + self.register_metrics(db.clone()); + + Box::pin(async move { + self.set_tip(tip); + let result = self.run_loop(db).await; + trace!(target: "sync::pipeline", ?tip, ?result, "Pipeline finished"); + (self, result) + }) + } + /// Run the pipeline in an infinite loop. Will terminate early if the user has specified /// a `max_block` in the pipeline. pub async fn run(&mut self, db: Arc) -> Result<(), PipelineError> { self.register_metrics(db.clone()); loop { - let next_action = self.run_loop(db.as_ref()).await?; + let next_action = self.run_loop(db.clone()).await?; // Terminate the loop early if it's reached the maximum user // configured block. @@ -172,7 +204,7 @@ impl Pipeline { /// If any stage is unsuccessful at execution, we proceed to /// unwind. This will undo the progress across the entire pipeline /// up to the block that caused the error. - async fn run_loop(&mut self, db: &DB) -> Result { + async fn run_loop(&mut self, db: Arc) -> Result { let mut previous_stage = None; for stage_index in 0..self.stages.len() { let stage = &self.stages[stage_index]; @@ -186,7 +218,7 @@ impl Pipeline { trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage"); let next = self - .execute_stage_to_completion(db, previous_stage, stage_index) + .execute_stage_to_completion(db.as_ref(), previous_stage, stage_index) .instrument(info_span!("execute", stage = %stage_id)) .await?; @@ -202,7 +234,7 @@ impl Pipeline { if let Some(ref updater) = self.sync_state_updater { updater.update_sync_state(SyncState::Downloading { target_block: target }); } - self.unwind(db, target, bad_block).await?; + self.unwind(db.as_ref(), target, bad_block).await?; return Ok(ControlFlow::Unwind { target, bad_block }) } } diff --git a/crates/storage/provider/src/test_utils/blocks.rs b/crates/storage/provider/src/test_utils/blocks.rs index 9d0a4fa40..75b60b9ab 100644 --- a/crates/storage/provider/src/test_utils/blocks.rs +++ b/crates/storage/provider/src/test_utils/blocks.rs @@ -45,7 +45,7 @@ pub fn assert_genesis_block(tx: &Transaction<'_, DB>, g: SealedBlo } /// Test chain with genesis, blocks, execution results -/// that have correcte changesets. +/// that have valid changesets. pub struct BlockChainTestData { /// Genesis pub genesis: SealedBlock,