feat: refactor and integrate local engine into EngineNodeLauncher (#11703)

This commit is contained in:
Arsenii Kulikov
2024-10-14 17:21:41 +04:00
committed by GitHub
parent c05a900542
commit 523bfb9c81
16 changed files with 504 additions and 404 deletions

17
Cargo.lock generated
View File

@ -6971,6 +6971,7 @@ dependencies = [
"reth", "reth",
"reth-chainspec", "reth-chainspec",
"reth-db", "reth-db",
"reth-engine-local",
"reth-network-peers", "reth-network-peers",
"reth-node-builder", "reth-node-builder",
"reth-node-ethereum", "reth-node-ethereum",
@ -7028,22 +7029,22 @@ dependencies = [
"alloy-rpc-types-engine", "alloy-rpc-types-engine",
"eyre", "eyre",
"futures-util", "futures-util",
"op-alloy-rpc-types-engine",
"reth-beacon-consensus", "reth-beacon-consensus",
"reth-chain-state",
"reth-chainspec", "reth-chainspec",
"reth-config", "reth-consensus",
"reth-db", "reth-engine-primitives",
"reth-engine-service",
"reth-engine-tree", "reth-engine-tree",
"reth-ethereum-engine-primitives", "reth-ethereum-engine-primitives",
"reth-exex-test-utils", "reth-evm",
"reth-node-types",
"reth-payload-builder", "reth-payload-builder",
"reth-payload-primitives", "reth-payload-primitives",
"reth-primitives", "reth-payload-validator",
"reth-provider", "reth-provider",
"reth-prune", "reth-prune",
"reth-rpc-types-compat",
"reth-stages-api", "reth-stages-api",
"reth-tracing",
"reth-transaction-pool", "reth-transaction-pool",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
@ -7809,6 +7810,7 @@ dependencies = [
"reth-db-api", "reth-db-api",
"reth-db-common", "reth-db-common",
"reth-downloaders", "reth-downloaders",
"reth-engine-local",
"reth-engine-service", "reth-engine-service",
"reth-engine-tree", "reth-engine-tree",
"reth-engine-util", "reth-engine-util",
@ -8131,6 +8133,7 @@ dependencies = [
"reth-db", "reth-db",
"reth-discv5", "reth-discv5",
"reth-e2e-test-utils", "reth-e2e-test-utils",
"reth-engine-local",
"reth-evm", "reth-evm",
"reth-network", "reth-network",
"reth-node-api", "reth-node-api",

View File

@ -150,15 +150,18 @@ pub enum ForkchoiceStatus {
} }
impl ForkchoiceStatus { impl ForkchoiceStatus {
pub(crate) const fn is_valid(&self) -> bool { /// Returns `true` if the forkchoice state is [`ForkchoiceStatus::Valid`].
pub const fn is_valid(&self) -> bool {
matches!(self, Self::Valid) matches!(self, Self::Valid)
} }
pub(crate) const fn is_invalid(&self) -> bool { /// Returns `true` if the forkchoice state is [`ForkchoiceStatus::Invalid`].
pub const fn is_invalid(&self) -> bool {
matches!(self, Self::Invalid) matches!(self, Self::Invalid)
} }
pub(crate) const fn is_syncing(&self) -> bool { /// Returns `true` if the forkchoice state is [`ForkchoiceStatus::Syncing`].
pub const fn is_syncing(&self) -> bool {
matches!(self, Self::Syncing) matches!(self, Self::Syncing)
} }

View File

@ -13,6 +13,7 @@ workspace = true
[dependencies] [dependencies]
reth.workspace = true reth.workspace = true
reth-chainspec.workspace = true reth-chainspec.workspace = true
reth-engine-local.workspace = true
reth-primitives.workspace = true reth-primitives.workspace = true
reth-tracing.workspace = true reth-tracing.workspace = true
reth-db = { workspace = true, features = ["test-utils"] } reth-db = { workspace = true, features = ["test-utils"] }

View File

@ -11,15 +11,19 @@ exclude.workspace = true
[dependencies] [dependencies]
# reth # reth
reth-beacon-consensus.workspace = true reth-beacon-consensus.workspace = true
reth-chain-state.workspace = true reth-chainspec.workspace = true
reth-consensus.workspace = true
reth-engine-primitives.workspace = true
reth-engine-service.workspace = true
reth-engine-tree.workspace = true reth-engine-tree.workspace = true
reth-evm.workspace = true
reth-ethereum-engine-primitives.workspace = true reth-ethereum-engine-primitives.workspace = true
reth-node-types.workspace = true
reth-payload-builder.workspace = true reth-payload-builder.workspace = true
reth-payload-primitives.workspace = true reth-payload-primitives.workspace = true
reth-primitives.workspace = true reth-payload-validator.workspace = true
reth-provider.workspace = true reth-provider.workspace = true
reth-prune.workspace = true reth-prune.workspace = true
reth-rpc-types-compat.workspace = true
reth-transaction-pool.workspace = true reth-transaction-pool.workspace = true
reth-stages-api.workspace = true reth-stages-api.workspace = true
@ -36,16 +40,10 @@ futures-util.workspace = true
eyre.workspace = true eyre.workspace = true
tracing.workspace = true tracing.workspace = true
[dev-dependencies] op-alloy-rpc-types-engine = { workspace = true, optional = true }
reth-chainspec.workspace = true
reth-chain-state.workspace = true
reth-config.workspace = true
reth-db = { workspace = true, features = ["test-utils"] }
reth-ethereum-engine-primitives.workspace = true
reth-exex-test-utils.workspace = true
reth-payload-builder = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-tracing.workspace = true
[lints] [lints]
workspace = true workspace = true
[features]
optimism = ["op-alloy-rpc-types-engine"]

View File

@ -1,4 +1,17 @@
//! A local engine service that can be used to drive a dev chain. //! A local engine service that can be used to drive a dev chain.
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
pub mod miner; pub mod miner;
pub mod payload; pub mod payload;
pub mod service; pub mod service;
pub use miner::MiningMode;
pub use payload::LocalPayloadAttributesBuilder;
pub use service::LocalEngineService;

View File

@ -1,16 +1,31 @@
//! Contains the implementation of the mining mode for the local engine. //! Contains the implementation of the mining mode for the local engine.
use alloy_primitives::TxHash; use alloy_primitives::{TxHash, B256};
use alloy_rpc_types_engine::{CancunPayloadFields, ForkchoiceState};
use eyre::OptionExt;
use futures_util::{stream::Fuse, StreamExt}; use futures_util::{stream::Fuse, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_chainspec::EthereumHardforks;
use reth_engine_primitives::EngineTypes;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{
BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadTypes,
};
use reth_provider::{BlockReader, ChainSpecProvider};
use reth_rpc_types_compat::engine::payload::block_to_payload;
use reth_transaction_pool::TransactionPool; use reth_transaction_pool::TransactionPool;
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
time::Duration, time::{Duration, UNIX_EPOCH},
};
use tokio::{
sync::{mpsc::UnboundedSender, oneshot},
time::Interval,
}; };
use tokio::time::Interval;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tracing::error;
/// A mining mode for the local dev engine. /// A mining mode for the local dev engine.
#[derive(Debug)] #[derive(Debug)]
@ -58,3 +73,175 @@ impl Future for MiningMode {
} }
} }
} }
/// Local miner advancing the chain/
#[derive(Debug)]
pub struct LocalMiner<EngineT: EngineTypes, Provider, B> {
/// Provider to read the current tip of the chain.
provider: Provider,
/// The payload attribute builder for the engine
payload_attributes_builder: B,
/// Sender for events to engine.
to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
/// The mining mode for the engine
mode: MiningMode,
/// The payload builder for the engine
payload_builder: PayloadBuilderHandle<EngineT>,
/// Timestamp for the next block.
last_timestamp: u64,
/// Stores latest mined blocks.
last_block_hashes: Vec<B256>,
}
impl<EngineT, Provider, B> LocalMiner<EngineT, Provider, B>
where
EngineT: EngineTypes,
Provider: BlockReader + ChainSpecProvider<ChainSpec: EthereumHardforks> + 'static,
B: PayloadAttributesBuilder<<EngineT as PayloadTypes>::PayloadAttributes>,
{
/// Spawns a new [`LocalMiner`] with the given parameters.
pub fn spawn_new(
provider: Provider,
payload_attributes_builder: B,
to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
mode: MiningMode,
payload_builder: PayloadBuilderHandle<EngineT>,
) {
let latest_header =
provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
let miner = Self {
provider,
payload_attributes_builder,
to_engine,
mode,
payload_builder,
last_timestamp: latest_header.timestamp,
last_block_hashes: vec![latest_header.hash()],
};
// Spawn the miner
tokio::spawn(miner.run());
}
/// Runs the [`LocalMiner`] in a loop, polling the miner and building payloads.
async fn run(mut self) {
let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
// Wait for the interval or the pool to receive a transaction
_ = &mut self.mode => {
if let Err(e) = self.advance().await {
error!(target: "engine::local", "Error advancing the chain: {:?}", e);
}
}
// send FCU once in a while
_ = fcu_interval.tick() => {
if let Err(e) = self.update_forkchoice_state().await {
error!(target: "engine::local", "Error updating fork choice: {:?}", e);
}
}
}
}
}
/// Returns current forkchoice state.
fn forkchoice_state(&self) -> ForkchoiceState {
ForkchoiceState {
head_block_hash: *self.last_block_hashes.last().expect("at least 1 block exists"),
safe_block_hash: *self
.last_block_hashes
.get(self.last_block_hashes.len().saturating_sub(32))
.expect("at least 1 block exists"),
finalized_block_hash: *self
.last_block_hashes
.get(self.last_block_hashes.len().saturating_sub(64))
.expect("at least 1 block exists"),
}
}
/// Sends a FCU to the engine.
async fn update_forkchoice_state(&self) -> eyre::Result<()> {
let (tx, rx) = oneshot::channel();
self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state: self.forkchoice_state(),
payload_attrs: None,
tx,
})?;
let res = rx.await??;
if !res.forkchoice_status().is_valid() {
eyre::bail!("Invalid fork choice update")
}
Ok(())
}
/// Generates payload attributes for a new block, passes them to FCU and inserts built payload
/// through newPayload.
async fn advance(&mut self) -> eyre::Result<()> {
let timestamp = std::cmp::max(
self.last_timestamp + 1,
std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("cannot be earlier than UNIX_EPOCH")
.as_secs(),
);
let (tx, rx) = oneshot::channel();
self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state: self.forkchoice_state(),
payload_attrs: Some(self.payload_attributes_builder.build(timestamp)),
tx,
})?;
let res = rx.await??.await?;
if !res.payload_status.is_valid() {
eyre::bail!("Invalid payload status")
}
let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
// wait for some time to let the payload be built
tokio::time::sleep(Duration::from_millis(200)).await;
let Some(Ok(payload)) = self.payload_builder.best_payload(payload_id).await else {
eyre::bail!("No payload")
};
let block = payload.block();
let cancun_fields =
if self.provider.chain_spec().is_cancun_active_at_timestamp(block.timestamp) {
Some(CancunPayloadFields {
parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
versioned_hashes: block.blob_versioned_hashes().into_iter().copied().collect(),
})
} else {
None
};
let (tx, rx) = oneshot::channel();
self.to_engine.send(BeaconEngineMessage::NewPayload {
payload: block_to_payload(payload.block().clone()),
cancun_fields,
tx,
})?;
let res = rx.await??;
if !res.is_valid() {
eyre::bail!("Invalid payload")
}
self.last_timestamp = timestamp;
self.last_block_hashes.push(block.hash());
// ensure we keep at most 64 blocks
if self.last_block_hashes.len() > 64 {
self.last_block_hashes =
self.last_block_hashes.split_off(self.last_block_hashes.len() - 64);
}
Ok(())
}
}

View File

@ -2,29 +2,62 @@
//! [`LocalEngineService`](super::service::LocalEngineService). //! [`LocalEngineService`](super::service::LocalEngineService).
use alloy_primitives::{Address, B256}; use alloy_primitives::{Address, B256};
use reth_chainspec::EthereumHardforks;
use reth_ethereum_engine_primitives::EthPayloadAttributes; use reth_ethereum_engine_primitives::EthPayloadAttributes;
use reth_payload_primitives::PayloadAttributesBuilder; use reth_payload_primitives::PayloadAttributesBuilder;
use std::{convert::Infallible, time::UNIX_EPOCH}; use std::sync::Arc;
/// The attributes builder for local Ethereum payload. /// The attributes builder for local Ethereum payload.
#[derive(Debug)] #[derive(Debug)]
pub struct EthLocalPayloadAttributesBuilder; #[non_exhaustive]
pub struct LocalPayloadAttributesBuilder<ChainSpec> {
chain_spec: Arc<ChainSpec>,
}
impl PayloadAttributesBuilder for EthLocalPayloadAttributesBuilder { impl<ChainSpec> LocalPayloadAttributesBuilder<ChainSpec> {
type PayloadAttributes = EthPayloadAttributes; /// Creates a new instance of the builder.
type Error = Infallible; pub const fn new(chain_spec: Arc<ChainSpec>) -> Self {
Self { chain_spec }
fn build(&self) -> Result<Self::PayloadAttributes, Self::Error> { }
let ts = std::time::SystemTime::now() }
.duration_since(UNIX_EPOCH)
.expect("cannot be earlier than UNIX_EPOCH"); impl<ChainSpec> PayloadAttributesBuilder<EthPayloadAttributes>
for LocalPayloadAttributesBuilder<ChainSpec>
Ok(EthPayloadAttributes { where
timestamp: ts.as_secs(), ChainSpec: Send + Sync + EthereumHardforks + 'static,
prev_randao: B256::random(), {
suggested_fee_recipient: Address::random(), fn build(&self, timestamp: u64) -> EthPayloadAttributes {
withdrawals: None, EthPayloadAttributes {
parent_beacon_block_root: None, timestamp,
}) prev_randao: B256::random(),
suggested_fee_recipient: Address::random(),
withdrawals: if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) {
Some(Default::default())
} else {
None
},
parent_beacon_block_root: if self.chain_spec.is_cancun_active_at_timestamp(timestamp) {
Some(B256::random())
} else {
None
},
}
}
}
#[cfg(feature = "optimism")]
impl<ChainSpec> PayloadAttributesBuilder<op_alloy_rpc_types_engine::OpPayloadAttributes>
for LocalPayloadAttributesBuilder<ChainSpec>
where
ChainSpec: Send + Sync + EthereumHardforks + 'static,
{
fn build(&self, timestamp: u64) -> op_alloy_rpc_types_engine::OpPayloadAttributes {
op_alloy_rpc_types_engine::OpPayloadAttributes {
payload_attributes: self.build(timestamp),
transactions: None,
no_tx_pool: None,
gas_limit: None,
eip_1559_params: None,
}
} }
} }

View File

@ -6,357 +6,154 @@
//! with a single transaction. The `Interval` mode will initiate block //! with a single transaction. The `Interval` mode will initiate block
//! building at a fixed interval. //! building at a fixed interval.
use crate::miner::MiningMode; use core::fmt;
use eyre::eyre; use std::{
use reth_beacon_consensus::EngineNodeTypes; fmt::{Debug, Formatter},
use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain}; pin::Pin,
use reth_engine_tree::persistence::PersistenceHandle; sync::Arc,
use reth_payload_builder::PayloadBuilderHandle; task::{Context, Poll},
use reth_payload_primitives::{
BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadBuilderAttributes, PayloadTypes,
}; };
use reth_provider::ProviderFactory;
use crate::miner::{LocalMiner, MiningMode};
use futures_util::{Stream, StreamExt};
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineNodeTypes};
use reth_chainspec::EthChainSpec;
use reth_consensus::Consensus;
use reth_engine_service::service::EngineMessageStream;
use reth_engine_tree::{
chain::{ChainEvent, HandlerEvent},
engine::{
EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineRequestHandler, FromEngine,
RequestHandlerEvent,
},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
};
use reth_evm::execute::BlockExecutorProvider;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{PayloadAttributesBuilder, PayloadTypes};
use reth_payload_validator::ExecutionPayloadValidator;
use reth_provider::{providers::BlockchainProvider2, ChainSpecProvider, ProviderFactory};
use reth_prune::PrunerWithFactory; use reth_prune::PrunerWithFactory;
use reth_stages_api::MetricEventsSender; use reth_stages_api::MetricEventsSender;
use tokio::sync::oneshot; use tokio::sync::mpsc::UnboundedSender;
use tracing::debug; use tracing::error;
/// Provides a local dev service engine that can be used to drive the /// Provides a local dev service engine that can be used to drive the
/// chain forward. /// chain forward.
#[derive(Debug)] ///
pub struct LocalEngineService<N, B> /// This service both produces and consumes [`BeaconEngineMessage`]s. This is done to allow
/// modifications of the stream
pub struct LocalEngineService<N>
where where
N: EngineNodeTypes, N: EngineNodeTypes,
B: PayloadAttributesBuilder<PayloadAttributes = <N::Engine as PayloadTypes>::PayloadAttributes>,
{ {
/// The payload builder for the engine /// Processes requests.
payload_builder: PayloadBuilderHandle<N::Engine>, ///
/// The payload attribute builder for the engine /// This type is responsible for processing incoming requests.
payload_attributes_builder: B, handler: EngineApiRequestHandler<EngineApiRequest<N::Engine>>,
/// Keep track of the Canonical chain state that isn't persisted on disk yet /// Receiver for incoming requests (from the engine API endpoint) that need to be processed.
canonical_in_memory_state: CanonicalInMemoryState, incoming_requests: EngineMessageStream<N::Engine>,
/// A handle to the persistence layer
persistence_handle: PersistenceHandle,
/// The mining mode for the engine
mode: MiningMode,
} }
impl<N, B> LocalEngineService<N, B> impl<N> LocalEngineService<N>
where where
N: EngineNodeTypes, N: EngineNodeTypes,
B: PayloadAttributesBuilder<PayloadAttributes = <N::Engine as PayloadTypes>::PayloadAttributes>,
{ {
/// Constructor for [`LocalEngineService`]. /// Constructor for [`LocalEngineService`].
pub fn new( #[allow(clippy::too_many_arguments)]
payload_builder: PayloadBuilderHandle<N::Engine>, pub fn new<B>(
payload_attributes_builder: B, consensus: Arc<dyn Consensus>,
executor_factory: impl BlockExecutorProvider,
provider: ProviderFactory<N>, provider: ProviderFactory<N>,
blockchain_db: BlockchainProvider2<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>, pruner: PrunerWithFactory<ProviderFactory<N>>,
canonical_in_memory_state: CanonicalInMemoryState, payload_builder: PayloadBuilderHandle<N::Engine>,
tree_config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook>,
sync_metrics_tx: MetricEventsSender, sync_metrics_tx: MetricEventsSender,
to_engine: UnboundedSender<BeaconEngineMessage<N::Engine>>,
from_engine: EngineMessageStream<N::Engine>,
mode: MiningMode, mode: MiningMode,
) -> Self { payload_attributes_builder: B,
) -> Self
where
B: PayloadAttributesBuilder<<N::Engine as PayloadTypes>::PayloadAttributes>,
{
let chain_spec = provider.chain_spec();
let engine_kind =
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
let persistence_handle = let persistence_handle =
PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx); PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx);
let payload_validator = ExecutionPayloadValidator::new(chain_spec);
Self { let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
payload_builder,
payload_attributes_builder, let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
canonical_in_memory_state, blockchain_db.clone(),
executor_factory,
consensus,
payload_validator,
persistence_handle, persistence_handle,
mode, payload_builder.clone(),
} canonical_in_memory_state,
} tree_config,
invalid_block_hook,
engine_kind,
);
/// Spawn the [`LocalEngineService`] on a tokio green thread. The service will poll the payload let handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
/// builder with two varying modes, [`MiningMode::Instant`] or [`MiningMode::Interval`]
/// which will respectively either execute the block as soon as it finds a LocalMiner::spawn_new(
/// transaction in the pool or build the block based on an interval. blockchain_db,
pub fn spawn_new(
payload_builder: PayloadBuilderHandle<N::Engine>,
payload_attributes_builder: B,
provider: ProviderFactory<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
canonical_in_memory_state: CanonicalInMemoryState,
sync_metrics_tx: MetricEventsSender,
mode: MiningMode,
) {
let engine = Self::new(
payload_builder,
payload_attributes_builder, payload_attributes_builder,
provider, to_engine,
pruner,
canonical_in_memory_state,
sync_metrics_tx,
mode, mode,
payload_builder,
); );
// Spawn the engine Self { handler, incoming_requests: from_engine }
tokio::spawn(engine.run());
}
/// Runs the [`LocalEngineService`] in a loop, polling the miner and building
/// payloads.
async fn run(mut self) {
loop {
// Wait for the interval or the pool to receive a transaction
(&mut self.mode).await;
// Start a new payload building job
let executed_block = self.build_and_save_payload().await;
if executed_block.is_err() {
debug!(target: "local_engine", err = ?executed_block.unwrap_err(), "failed payload building");
continue
}
let block = executed_block.expect("not error");
let res = self.update_canonical_in_memory_state(block);
if res.is_err() {
debug!(target: "local_engine", err = ?res.unwrap_err(), "failed canonical state update");
}
}
}
/// Builds a payload by initiating a new payload job via the [`PayloadBuilderHandle`],
/// saving the execution outcome to persistence and returning the executed block.
async fn build_and_save_payload(&self) -> eyre::Result<ExecutedBlock> {
let payload_attributes = self.payload_attributes_builder.build()?;
let parent = self.canonical_in_memory_state.get_canonical_head().hash();
let payload_builder_attributes =
<N::Engine as PayloadTypes>::PayloadBuilderAttributes::try_new(
parent,
payload_attributes,
)
.map_err(|_| eyre::eyre!("failed to fetch payload attributes"))?;
let payload = self
.payload_builder
.send_and_resolve_payload(payload_builder_attributes)
.await?
.await?;
let executed_block =
payload.executed_block().ok_or_else(|| eyre!("missing executed block"))?;
let (tx, rx) = oneshot::channel();
let _ = self.persistence_handle.save_blocks(vec![executed_block.clone()], tx);
// Wait for the persistence_handle to complete
let _ = rx.await?.ok_or_else(|| eyre!("missing new head"))?;
Ok(executed_block)
}
/// Update the canonical in memory state and send notification for a new canon state to
/// all the listeners.
fn update_canonical_in_memory_state(&self, executed_block: ExecutedBlock) -> eyre::Result<()> {
let chain = NewCanonicalChain::Commit { new: vec![executed_block] };
let tip = chain.tip().header.clone();
let notification = chain.to_chain_notification();
// Update the tracked in-memory state with the new chain
self.canonical_in_memory_state.update_chain(chain);
self.canonical_in_memory_state.set_canonical_head(tip);
// Sends an event to all active listeners about the new canonical chain
self.canonical_in_memory_state.notify_canon_state(notification);
Ok(())
} }
} }
#[cfg(test)] impl<N> Stream for LocalEngineService<N>
mod tests { where
use super::*; N: EngineNodeTypes,
use reth_chainspec::MAINNET; {
use reth_config::PruneConfig; type Item = ChainEvent<BeaconConsensusEngineEvent>;
use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_exex_test_utils::TestNode;
use reth_node_types::NodeTypesWithDBAdapter;
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_provider::{providers::StaticFileProvider, BlockReader, ProviderFactory};
use reth_prune::PrunerBuilder;
use reth_transaction_pool::{
test_utils::{testing_pool, MockTransaction},
TransactionPool,
};
use std::{convert::Infallible, time::Duration};
use tokio::sync::mpsc::unbounded_channel;
#[derive(Debug)] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
struct TestPayloadAttributesBuilder; let this = self.get_mut();
impl PayloadAttributesBuilder for TestPayloadAttributesBuilder { if let Poll::Ready(ev) = this.handler.poll(cx) {
type PayloadAttributes = alloy_rpc_types_engine::PayloadAttributes; return match ev {
type Error = Infallible; RequestHandlerEvent::HandlerEvent(ev) => match ev {
HandlerEvent::BackfillAction(_) => {
fn build(&self) -> Result<Self::PayloadAttributes, Self::Error> { error!(target: "engine::local", "received backfill request in local engine");
Ok(alloy_rpc_types_engine::PayloadAttributes { Poll::Ready(Some(ChainEvent::FatalError))
timestamp: 0, }
prev_randao: Default::default(), HandlerEvent::Event(ev) => Poll::Ready(Some(ChainEvent::Handler(ev))),
suggested_fee_recipient: Default::default(), HandlerEvent::FatalError => Poll::Ready(Some(ChainEvent::FatalError)),
withdrawals: None, },
parent_beacon_block_root: None, RequestHandlerEvent::Download(_) => {
}) error!(target: "engine::local", "received download request in local engine");
Poll::Ready(Some(ChainEvent::FatalError))
}
}
} }
}
#[tokio::test] // forward incoming requests to the handler
async fn test_local_engine_service_interval() -> eyre::Result<()> { while let Poll::Ready(Some(req)) = this.incoming_requests.poll_next_unpin(cx) {
reth_tracing::init_test_tracing(); this.handler.on_event(FromEngine::Request(req.into()));
}
// Start the provider and the pruner Poll::Pending
let (_, static_dir_path) = create_test_static_files_dir(); }
let provider = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new( }
create_test_rw_db(),
MAINNET.clone(), impl<N: EngineNodeTypes> Debug for LocalEngineService<N> {
StaticFileProvider::read_write(static_dir_path)?, fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
); f.debug_struct("LocalEngineService").finish_non_exhaustive()
let pruner = PrunerBuilder::new(PruneConfig::default())
.build_with_provider_factory(provider.clone());
// Create an empty canonical in memory state
let canonical_in_memory_state = CanonicalInMemoryState::empty();
// Start the payload builder service
let payload_handle = spawn_test_payload_service::<EthEngineTypes>();
// Sync metric channel
let (sync_metrics_tx, _) = unbounded_channel();
// Launch the LocalEngineService in interval mode
let period = Duration::from_secs(1);
LocalEngineService::spawn_new(
payload_handle,
TestPayloadAttributesBuilder,
provider.clone(),
pruner,
canonical_in_memory_state,
sync_metrics_tx,
MiningMode::interval(period),
);
// Check that we have no block for now
let block = provider.block_by_number(0)?;
assert!(block.is_none());
// Wait 4 intervals
tokio::time::sleep(2 * period).await;
// Assert a block has been build
let block = provider.block_by_number(0)?;
assert!(block.is_some());
Ok(())
}
#[tokio::test]
async fn test_local_engine_service_instant() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
// Start the provider and the pruner
let (_, static_dir_path) = create_test_static_files_dir();
let provider = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new(
create_test_rw_db(),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path)?,
);
let pruner = PrunerBuilder::new(PruneConfig::default())
.build_with_provider_factory(provider.clone());
// Create an empty canonical in memory state
let canonical_in_memory_state = CanonicalInMemoryState::empty();
// Start the payload builder service
let payload_handle = spawn_test_payload_service::<EthEngineTypes>();
// Start a transaction pool
let pool = testing_pool();
// Sync metric channel
let (sync_metrics_tx, _) = unbounded_channel();
// Launch the LocalEngineService in instant mode
LocalEngineService::spawn_new(
payload_handle,
TestPayloadAttributesBuilder,
provider.clone(),
pruner,
canonical_in_memory_state,
sync_metrics_tx,
MiningMode::instant(pool.clone()),
);
// Wait for a small period to assert block building is
// triggered by adding a transaction to the pool
let period = Duration::from_millis(500);
tokio::time::sleep(period).await;
let block = provider.block_by_number(0)?;
assert!(block.is_none());
// Add a transaction to the pool
let transaction = MockTransaction::legacy().with_gas_price(10);
pool.add_transaction(Default::default(), transaction).await?;
// Wait for block building
let period = Duration::from_secs(2);
tokio::time::sleep(period).await;
// Assert a block has been build
let block = provider.block_by_number(0)?;
assert!(block.is_some());
Ok(())
}
#[tokio::test]
async fn test_canonical_chain_subscription() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
// Start the provider and the pruner
let (_, static_dir_path) = create_test_static_files_dir();
let provider = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new(
create_test_rw_db(),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path)?,
);
let pruner = PrunerBuilder::new(PruneConfig::default())
.build_with_provider_factory(provider.clone());
// Create an empty canonical in memory state
let canonical_in_memory_state = CanonicalInMemoryState::empty();
let mut notifications = canonical_in_memory_state.subscribe_canon_state();
// Start the payload builder service
let payload_handle = spawn_test_payload_service::<EthEngineTypes>();
// Start a transaction pool
let pool = testing_pool();
// Sync metric channel
let (sync_metrics_tx, _) = unbounded_channel();
// Launch the LocalEngineService in instant mode
LocalEngineService::spawn_new(
payload_handle,
TestPayloadAttributesBuilder,
provider.clone(),
pruner,
canonical_in_memory_state,
sync_metrics_tx,
MiningMode::instant(pool.clone()),
);
// Add a transaction to the pool
let transaction = MockTransaction::legacy().with_gas_price(10);
pool.add_transaction(Default::default(), transaction).await?;
// Check a notification is received for block 0
let res = notifications.recv().await?;
assert_eq!(res.tip().number, 0);
Ok(())
} }
} }

View File

@ -31,7 +31,7 @@ use std::{
}; };
/// Alias for consensus engine stream. /// Alias for consensus engine stream.
type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>; pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
/// Alias for chain orchestrator. /// Alias for chain orchestrator.
type EngineServiceType<N, Client> = ChainOrchestrator< type EngineServiceType<N, Client> = ChainOrchestrator<

View File

@ -3,29 +3,64 @@ use std::sync::Arc;
use alloy_genesis::Genesis; use alloy_genesis::Genesis;
use alloy_primitives::{b256, hex}; use alloy_primitives::{b256, hex};
use futures::StreamExt; use futures::StreamExt;
use reth::core::rpc::eth::helpers::EthTransactions; use reth::{args::DevArgs, core::rpc::eth::helpers::EthTransactions};
use reth_chainspec::ChainSpec; use reth_chainspec::ChainSpec;
use reth_e2e_test_utils::setup; use reth_e2e_test_utils::setup;
use reth_provider::CanonStateSubscriptions; use reth_node_api::{FullNodeComponents, NodeAddOns};
use reth_node_builder::{EngineNodeLauncher, FullNode, NodeBuilder, NodeConfig, NodeHandle};
use crate::utils::EthNode; use reth_node_ethereum::{node::EthereumAddOns, EthereumNode};
use reth_provider::{providers::BlockchainProvider2, CanonStateSubscriptions};
use reth_tasks::TaskManager;
#[tokio::test] #[tokio::test]
async fn can_run_dev_node() -> eyre::Result<()> { async fn can_run_dev_node() -> eyre::Result<()> {
reth_tracing::init_test_tracing(); reth_tracing::init_test_tracing();
let (mut nodes, _tasks, _) = setup(1, custom_chain(), true).await?; let (mut nodes, _tasks, _) = setup::<EthereumNode>(1, custom_chain(), true).await?;
assert_chain_advances(nodes.pop().unwrap()).await; assert_chain_advances(nodes.pop().unwrap().inner).await;
Ok(()) Ok(())
} }
async fn assert_chain_advances(node: EthNode) { #[tokio::test]
let mut notifications = node.inner.provider.canonical_state_stream(); async fn can_run_dev_node_new_engine() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let node_config = NodeConfig::test()
.with_chain(custom_chain())
.with_dev(DevArgs { dev: true, ..Default::default() });
let NodeHandle { node, .. } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.with_types_and_provider::<EthereumNode, BlockchainProvider2<_>>()
.with_components(EthereumNode::components())
.with_add_ons(EthereumAddOns::default())
.launch_with_fn(|builder| {
let launcher = EngineNodeLauncher::new(
builder.task_executor().clone(),
builder.config().datadir(),
Default::default(),
);
builder.launch_with(launcher)
})
.await?;
assert_chain_advances(node).await;
Ok(())
}
async fn assert_chain_advances<N, AddOns>(node: FullNode<N, AddOns>)
where
N: FullNodeComponents<Provider: CanonStateSubscriptions>,
AddOns: NodeAddOns<N, EthApi: EthTransactions>,
{
let mut notifications = node.provider.canonical_state_stream();
// submit tx through rpc // submit tx through rpc
let raw_tx = hex!("02f876820a28808477359400847735940082520894ab0840c0e43688012c1adb0f5e3fc665188f83d28a029d394a5d630544000080c080a0a044076b7e67b5deecc63f61a8d7913fab86ca365b344b5759d1fe3563b4c39ea019eab979dd000da04dfc72bb0377c092d30fd9e1cab5ae487de49586cc8b0090"); let raw_tx = hex!("02f876820a28808477359400847735940082520894ab0840c0e43688012c1adb0f5e3fc665188f83d28a029d394a5d630544000080c080a0a044076b7e67b5deecc63f61a8d7913fab86ca365b344b5759d1fe3563b4c39ea019eab979dd000da04dfc72bb0377c092d30fd9e1cab5ae487de49586cc8b0090");
let eth_api = node.inner.rpc_registry.eth_api(); let eth_api = node.rpc_registry.eth_api();
let hash = eth_api.send_raw_transaction(raw_tx.into()).await.unwrap(); let hash = eth_api.send_raw_transaction(raw_tx.into()).await.unwrap();

View File

@ -1,12 +1,7 @@
use alloy_primitives::{Address, B256}; use alloy_primitives::{Address, B256};
use reth::rpc::types::engine::PayloadAttributes; use reth::rpc::types::engine::PayloadAttributes;
use reth_e2e_test_utils::NodeHelperType;
use reth_node_ethereum::{node::EthereumAddOns, EthereumNode};
use reth_payload_builder::EthPayloadBuilderAttributes; use reth_payload_builder::EthPayloadBuilderAttributes;
/// Ethereum Node Helper type
pub(crate) type EthNode = NodeHelperType<EthereumNode, EthereumAddOns>;
/// Helper function to create a new eth payload attributes /// Helper function to create a new eth payload attributes
pub(crate) fn eth_payload_attributes(timestamp: u64) -> EthPayloadBuilderAttributes { pub(crate) fn eth_payload_attributes(timestamp: u64) -> EthPayloadBuilderAttributes {
let attributes = PayloadAttributes { let attributes = PayloadAttributes {

View File

@ -26,6 +26,7 @@ reth-db = { workspace = true, features = ["mdbx"], optional = true }
reth-db-api.workspace = true reth-db-api.workspace = true
reth-db-common.workspace = true reth-db-common.workspace = true
reth-downloaders.workspace = true reth-downloaders.workspace = true
reth-engine-local.workspace = true
reth-engine-service.workspace = true reth-engine-service.workspace = true
reth-engine-tree.workspace = true reth-engine-tree.workspace = true
reth-engine-util.workspace = true reth-engine-util.workspace = true

View File

@ -9,6 +9,7 @@ use reth_beacon_consensus::{
use reth_blockchain_tree::BlockchainTreeConfig; use reth_blockchain_tree::BlockchainTreeConfig;
use reth_chainspec::EthChainSpec; use reth_chainspec::EthChainSpec;
use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider}; use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider};
use reth_engine_local::{LocalEngineService, LocalPayloadAttributesBuilder, MiningMode};
use reth_engine_service::service::{ChainEvent, EngineService}; use reth_engine_service::service::{ChainEvent, EngineService};
use reth_engine_tree::{ use reth_engine_tree::{
engine::{EngineApiRequest, EngineRequestHandler}, engine::{EngineApiRequest, EngineRequestHandler},
@ -18,7 +19,10 @@ use reth_engine_util::EngineMessageStreamExt;
use reth_exex::ExExManagerHandle; use reth_exex::ExExManagerHandle;
use reth_network::{NetworkSyncUpdater, SyncState}; use reth_network::{NetworkSyncUpdater, SyncState};
use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider}; use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider};
use reth_node_api::{BuiltPayload, FullNodeTypes, NodeAddOns, NodeTypesWithEngine}; use reth_node_api::{
BuiltPayload, FullNodeTypes, NodeAddOns, NodeTypesWithEngine, PayloadAttributesBuilder,
PayloadTypes,
};
use reth_node_core::{ use reth_node_core::{
dirs::{ChainPath, DataDirPath}, dirs::{ChainPath, DataDirPath},
exit::NodeExitFuture, exit::NodeExitFuture,
@ -80,6 +84,9 @@ where
+ FullEthApiServer + FullEthApiServer
+ AddDevSigners, + AddDevSigners,
>, >,
LocalPayloadAttributesBuilder<Types::ChainSpec>: PayloadAttributesBuilder<
<<Types as NodeTypesWithEngine>::Engine as PayloadTypes>::PayloadAttributes,
>,
{ {
type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>; type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
@ -210,23 +217,49 @@ where
let pruner_events = pruner.events(); let pruner_events = pruner.events();
info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized"); info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
// Configure the consensus engine let mut engine_service = if ctx.is_dev() {
let mut eth_service = EngineService::new( let mining_mode = if let Some(block_time) = ctx.node_config().dev.block_time {
ctx.consensus(), MiningMode::interval(block_time)
ctx.components().block_executor().clone(), } else {
ctx.chain_spec(), MiningMode::instant(ctx.components().pool().clone())
network_client.clone(), };
Box::pin(consensus_engine_stream), let eth_service = LocalEngineService::new(
pipeline, ctx.consensus(),
Box::new(ctx.task_executor().clone()), ctx.components().block_executor().clone(),
ctx.provider_factory().clone(), ctx.provider_factory().clone(),
ctx.blockchain_db().clone(), ctx.blockchain_db().clone(),
pruner, pruner,
ctx.components().payload_builder().clone(), ctx.components().payload_builder().clone(),
engine_tree_config, engine_tree_config,
ctx.invalid_block_hook()?, ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(), ctx.sync_metrics_tx(),
); consensus_engine_tx.clone(),
Box::pin(consensus_engine_stream),
mining_mode,
LocalPayloadAttributesBuilder::new(ctx.chain_spec()),
);
Either::Left(eth_service)
} else {
let eth_service = EngineService::new(
ctx.consensus(),
ctx.components().block_executor().clone(),
ctx.chain_spec(),
network_client.clone(),
Box::pin(consensus_engine_stream),
pipeline,
Box::new(ctx.task_executor().clone()),
ctx.provider_factory().clone(),
ctx.blockchain_db().clone(),
pruner,
ctx.components().payload_builder().clone(),
engine_tree_config,
ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(),
);
Either::Right(eth_service)
};
let event_sender = EventSender::default(); let event_sender = EventSender::default();
@ -340,7 +373,9 @@ where
ctx.task_executor().spawn_critical("consensus engine", async move { ctx.task_executor().spawn_critical("consensus engine", async move {
if let Some(initial_target) = initial_target { if let Some(initial_target) = initial_target {
debug!(target: "reth::cli", %initial_target, "start backfill sync"); debug!(target: "reth::cli", %initial_target, "start backfill sync");
eth_service.orchestrator_mut().start_backfill_sync(initial_target); if let Either::Right(eth_service) = &mut engine_service {
eth_service.orchestrator_mut().start_backfill_sync(initial_target);
}
} }
let mut res = Ok(()); let mut res = Ok(());
@ -351,10 +386,12 @@ where
payload = built_payloads.select_next_some() => { payload = built_payloads.select_next_some() => {
if let Some(executed_block) = payload.executed_block() { if let Some(executed_block) = payload.executed_block() {
debug!(target: "reth::cli", block=?executed_block.block().num_hash(), "inserting built payload"); debug!(target: "reth::cli", block=?executed_block.block().num_hash(), "inserting built payload");
eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into()); if let Either::Right(eth_service) = &mut engine_service {
eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
}
} }
} }
event = eth_service.next() => { event = engine_service.next() => {
let Some(event) = event else { break }; let Some(event) = event else { break };
debug!(target: "reth::cli", "Event: {event}"); debug!(target: "reth::cli", "Event: {event}");
match event { match event {

View File

@ -504,7 +504,7 @@ where
} else if let Some(latest_block) = this.state.latest_block { } else if let Some(latest_block) = this.state.latest_block {
let now = let now =
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(); SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
if now - this.state.latest_block_time.unwrap_or(0) > 60 { if now.saturating_sub(this.state.latest_block_time.unwrap_or(0)) > 60 {
// Once we start receiving consensus nodes, don't emit status unless stalled for // Once we start receiving consensus nodes, don't emit status unless stalled for
// 1 minute // 1 minute
info!( info!(

View File

@ -13,6 +13,7 @@ workspace = true
[dependencies] [dependencies]
# reth # reth
reth-chainspec.workspace = true reth-chainspec.workspace = true
reth-engine-local.workspace = true
reth-primitives.workspace = true reth-primitives.workspace = true
reth-payload-builder.workspace = true reth-payload-builder.workspace = true
reth-auto-seal-consensus.workspace = true reth-auto-seal-consensus.workspace = true
@ -87,6 +88,7 @@ optimism = [
"reth-revm/optimism", "reth-revm/optimism",
"reth-auto-seal-consensus/optimism", "reth-auto-seal-consensus/optimism",
"reth-optimism-rpc/optimism", "reth-optimism-rpc/optimism",
"reth-engine-local/optimism",
] ]
asm-keccak = ["reth-primitives/asm-keccak"] asm-keccak = ["reth-primitives/asm-keccak"]
test-utils = ["reth-node-builder/test-utils"] test-utils = ["reth-node-builder/test-utils"]

View File

@ -160,12 +160,7 @@ impl PayloadAttributes for OpPayloadAttributes {
} }
/// A builder that can return the current payload attribute. /// A builder that can return the current payload attribute.
pub trait PayloadAttributesBuilder: std::fmt::Debug + Send + Sync + 'static { pub trait PayloadAttributesBuilder<Attributes>: Send + Sync + 'static {
/// The payload attributes type returned by the builder.
type PayloadAttributes: PayloadAttributes;
/// The error type returned by [`PayloadAttributesBuilder::build`].
type Error: core::error::Error + Send + Sync;
/// Return a new payload attribute from the builder. /// Return a new payload attribute from the builder.
fn build(&self) -> Result<Self::PayloadAttributes, Self::Error>; fn build(&self, timestamp: u64) -> Attributes;
} }