diff --git a/Cargo.lock b/Cargo.lock index 2c587c8c7..fcb0dfa5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7196,6 +7196,7 @@ dependencies = [ "reth-transaction-pool", "tempfile", "tokio", + "tokio-stream", ] [[package]] @@ -7218,6 +7219,7 @@ dependencies = [ "metrics-process", "metrics-util", "once_cell", + "pin-project", "procfs", "proptest", "rand 0.8.5", @@ -7256,6 +7258,7 @@ dependencies = [ "thiserror", "tikv-jemalloc-ctl", "tokio", + "tokio-util", "tracing", "vergen", ] diff --git a/bin/reth/src/commands/debug_cmd/replay_engine.rs b/bin/reth/src/commands/debug_cmd/replay_engine.rs index 1360c2f1b..947c12745 100644 --- a/bin/reth/src/commands/debug_cmd/replay_engine.rs +++ b/bin/reth/src/commands/debug_cmd/replay_engine.rs @@ -19,7 +19,7 @@ use reth_consensus::Consensus; use reth_db::{init_db, DatabaseEnv}; use reth_network::NetworkHandle; use reth_network_api::NetworkInfo; -use reth_node_core::engine_api_store::{EngineApiStore, StoredEngineApiMessage}; +use reth_node_core::engine::engine_store::{EngineMessageStore, StoredEngineApiMessage}; #[cfg(not(feature = "optimism"))] use reth_node_ethereum::{EthEngineTypes, EthEvmConfig}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; @@ -34,7 +34,7 @@ use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; use reth_transaction_pool::noop::NoopTransactionPool; use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; use tracing::*; /// `reth debug replay-engine` command @@ -191,8 +191,7 @@ impl Command { // Configure the consensus engine let network_client = network.fetch_client().await?; - let (consensus_engine_tx, consensus_engine_rx) = mpsc::unbounded_channel(); - let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( + let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::new( network_client, Pipeline::builder().build( provider_factory.clone(), @@ -210,8 +209,6 @@ impl Command { payload_builder, None, u64::MAX, - consensus_engine_tx, - consensus_engine_rx, EngineHooks::new(), )?; info!(target: "reth::cli", "Consensus engine initialized"); @@ -224,7 +221,7 @@ impl Command { let _ = tx.send(res); }); - let engine_api_store = EngineApiStore::new(self.engine_api_store.clone()); + let engine_api_store = EngineMessageStore::new(self.engine_api_store.clone()); for filepath in engine_api_store.engine_messages_iter()? { let contents = fs::read(&filepath).wrap_err(format!("failed to read: {}", filepath.display()))?; diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 839bb0278..a7761615c 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1,18 +1,10 @@ -use crate::{ - engine::{ - forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker}, - metrics::EngineMetrics, - }, - hooks::{EngineHookContext, EngineHooksController}, - sync::{EngineSyncController, EngineSyncEvent}, -}; -use futures::{Future, StreamExt}; +use futures::{stream::BoxStream, Future, StreamExt}; use reth_db::database::Database; use reth_engine_primitives::{EngineTypes, PayloadAttributes, PayloadBuilderAttributes}; use reth_interfaces::{ blockchain_tree::{ error::{BlockchainTreeError, CanonicalError, InsertBlockError, InsertBlockErrorKind}, - BlockStatus, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk, + BlockStatus, BlockValidationKind, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk, }, executor::BlockValidationError, p2p::{bodies::client::BodiesClient, headers::client::HeadersClient}, @@ -21,6 +13,7 @@ use reth_interfaces::{ RethError, RethResult, }; use reth_payload_builder::PayloadBuilderHandle; +use reth_payload_validator::ExecutionPayloadValidator; use reth_primitives::{ constants::EPOCH_SLOTS, stage::StageId, BlockNumHash, BlockNumber, Head, Header, SealedBlock, SealedHeader, B256, @@ -43,7 +36,7 @@ use std::{ time::{Duration, Instant}, }; use tokio::sync::{ - mpsc::{self, UnboundedReceiver, UnboundedSender}, + mpsc::{self, UnboundedSender}, oneshot, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -68,18 +61,19 @@ mod handle; pub use handle::BeaconConsensusEngineHandle; mod forkchoice; -use crate::hooks::{EngineHookEvent, EngineHooks, PolledHook}; pub use forkchoice::ForkchoiceStatus; -use reth_interfaces::blockchain_tree::BlockValidationKind; -use reth_payload_validator::ExecutionPayloadValidator; +use forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker}; mod metrics; +use metrics::EngineMetrics; pub(crate) mod sync; +use sync::{EngineSyncController, EngineSyncEvent}; /// Hooks for running during the main loop of /// [consensus engine][`crate::engine::BeaconConsensusEngine`]. pub mod hooks; +use hooks::{EngineHookContext, EngineHookEvent, EngineHooks, EngineHooksController, PolledHook}; #[cfg(test)] pub mod test_utils; @@ -180,7 +174,7 @@ where /// Used for emitting updates about whether the engine is syncing or not. sync_state_updater: Box, /// The Engine API message receiver. - engine_message_rx: UnboundedReceiverStream>, + engine_message_stream: BoxStream<'static, BeaconEngineMessage>, /// A clone of the handle handle: BeaconConsensusEngineHandle, /// Tracks the received forkchoice state updates received by the CL. @@ -254,7 +248,7 @@ where target, pipeline_run_threshold, to_engine, - rx, + Box::pin(UnboundedReceiverStream::from(rx)), hooks, ) } @@ -284,7 +278,7 @@ where target: Option, pipeline_run_threshold: u64, to_engine: UnboundedSender>, - rx: UnboundedReceiver>, + engine_message_stream: BoxStream<'static, BeaconEngineMessage>, hooks: EngineHooks, ) -> RethResult<(Self, BeaconConsensusEngineHandle)> { let handle = BeaconConsensusEngineHandle { to_engine }; @@ -303,7 +297,7 @@ where payload_validator: ExecutionPayloadValidator::new(blockchain.chain_spec()), blockchain, sync_state_updater, - engine_message_rx: UnboundedReceiverStream::new(rx), + engine_message_stream, handle: handle.clone(), forkchoice_state_tracker: Default::default(), payload_builder, @@ -1770,7 +1764,7 @@ where // // These messages can affect the state of the SyncController and they're also time // sensitive, hence they are polled first. - if let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) { + if let Poll::Ready(Some(msg)) = this.engine_message_stream.poll_next_unpin(cx) { match msg { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { this.on_forkchoice_updated(state, payload_attrs, tx); diff --git a/crates/node-core/Cargo.toml b/crates/node-core/Cargo.toml index 3caf5d9d1..e19b4d242 100644 --- a/crates/node-core/Cargo.toml +++ b/crates/node-core/Cargo.toml @@ -44,6 +44,8 @@ discv5.workspace = true # async tokio.workspace = true +tokio-util.workspace = true +pin-project.workspace = true # metrics metrics-exporter-prometheus = "0.12.1" diff --git a/crates/node-core/src/engine_api_store.rs b/crates/node-core/src/engine/engine_store.rs similarity index 78% rename from crates/node-core/src/engine_api_store.rs rename to crates/node-core/src/engine/engine_store.rs index 5552137f6..524e2c89b 100644 --- a/crates/node-core/src/engine_api_store.rs +++ b/crates/node-core/src/engine/engine_store.rs @@ -1,5 +1,6 @@ //! Stores engine API messages to disk for later inspection and replay. +use futures::{Stream, StreamExt}; use reth_beacon_consensus::BeaconEngineMessage; use reth_engine_primitives::EngineTypes; use reth_primitives::fs; @@ -8,8 +9,13 @@ use reth_rpc_types::{ ExecutionPayload, }; use serde::{Deserialize, Serialize}; -use std::{collections::BTreeMap, path::PathBuf, time::SystemTime}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use std::{ + collections::BTreeMap, + path::PathBuf, + pin::Pin, + task::{ready, Context, Poll}, + time::SystemTime, +}; use tracing::*; /// A message from the engine API that has been stored to disk. @@ -34,13 +40,13 @@ pub enum StoredEngineApiMessage { /// This can read and write engine API messages in a specific directory. #[derive(Debug)] -pub struct EngineApiStore { +pub struct EngineMessageStore { /// The path to the directory that stores the engine API messages. path: PathBuf, } -impl EngineApiStore { - /// Creates a new [EngineApiStore] at the given path. +impl EngineMessageStore { + /// Creates a new [EngineMessageStore] at the given path. /// /// The path is expected to be a directory, where individual message JSON files will be stored. pub fn new(path: PathBuf) -> Self { @@ -108,22 +114,42 @@ impl EngineApiStore { } Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths)) } +} - /// Intercepts an incoming engine API message, storing it to disk and forwarding it to the - /// engine channel. - pub async fn intercept( - self, - mut rx: UnboundedReceiver>, - to_engine: UnboundedSender>, - ) where - Engine: EngineTypes, - BeaconEngineMessage: std::fmt::Debug, - { - while let Some(msg) = rx.recv().await { - if let Err(error) = self.on_message(&msg, SystemTime::now()) { - error!(target: "engine::intercept", ?msg, %error, "Error handling Engine API message"); - } - let _ = to_engine.send(msg); - } +/// A wrapper stream that stores Engine API messages in +/// the specified directory. +#[derive(Debug)] +#[pin_project::pin_project] +pub struct EngineStoreStream { + /// Inner message stream. + #[pin] + stream: S, + /// Engine message store. + store: EngineMessageStore, +} + +impl EngineStoreStream { + /// Create new engine store stream wrapper. + pub fn new(stream: S, path: PathBuf) -> Self { + Self { stream, store: EngineMessageStore::new(path) } + } +} + +impl Stream for EngineStoreStream +where + Engine: EngineTypes, + S: Stream>, +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + let next = ready!(this.stream.poll_next_unpin(cx)); + if let Some(msg) = &next { + if let Err(error) = this.store.on_message(msg, SystemTime::now()) { + error!(target: "engine::intercept", ?msg, %error, "Error handling Engine API message"); + } + } + Poll::Ready(next) } } diff --git a/crates/node-core/src/engine/mod.rs b/crates/node-core/src/engine/mod.rs new file mode 100644 index 000000000..4ba8479e9 --- /dev/null +++ b/crates/node-core/src/engine/mod.rs @@ -0,0 +1,71 @@ +//! Collection of various stream utilities for consensus engine. + +use futures::Stream; +use reth_beacon_consensus::BeaconEngineMessage; +use reth_engine_primitives::EngineTypes; +use std::path::PathBuf; +use tokio_util::either::Either; + +pub mod engine_store; +use engine_store::EngineStoreStream; + +pub mod skip_fcu; +use skip_fcu::EngineSkipFcu; + +/// The collection of stream extensions for engine API message stream. +pub trait EngineMessageStreamExt: + Stream> +{ + /// Skips the specified number of [BeaconEngineMessage::ForkchoiceUpdated] messages from the + /// engine message stream. + fn skip_fcu(self, count: usize) -> EngineSkipFcu + where + Self: Sized, + { + EngineSkipFcu::new(self, count) + } + + /// If the count is [Some], returns the stream that skips the specified number of + /// [BeaconEngineMessage::ForkchoiceUpdated] messages. Otherwise, returns `Self`. + fn maybe_skip_fcu(self, maybe_count: Option) -> Either, Self> + where + Self: Sized, + { + if let Some(count) = maybe_count { + Either::Left(self.skip_fcu(count)) + } else { + Either::Right(self) + } + } + + /// Stores engine messages at the specified location. + fn store_messages(self, path: PathBuf) -> EngineStoreStream + where + Self: Sized, + { + EngineStoreStream::new(self, path) + } + + /// If the path is [Some], returns the stream that stores engine messages at the specified + /// location. Otherwise, returns `Self`. + fn maybe_store_messages( + self, + maybe_path: Option, + ) -> Either, Self> + where + Self: Sized, + { + if let Some(path) = maybe_path { + Either::Left(self.store_messages(path)) + } else { + Either::Right(self) + } + } +} + +impl EngineMessageStreamExt for T +where + Engine: EngineTypes, + T: Stream>, +{ +} diff --git a/crates/node-core/src/engine/skip_fcu.rs b/crates/node-core/src/engine/skip_fcu.rs new file mode 100644 index 000000000..34004134f --- /dev/null +++ b/crates/node-core/src/engine/skip_fcu.rs @@ -0,0 +1,64 @@ +//! Stores engine API messages to disk for later inspection and replay. + +use futures::{Stream, StreamExt}; +use reth_beacon_consensus::{BeaconEngineMessage, OnForkChoiceUpdated}; +use reth_engine_primitives::EngineTypes; +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; + +/// Engine API stream wrapper that skips the specified number of forkchoice updated messages. +#[derive(Debug)] +#[pin_project::pin_project] +pub struct EngineSkipFcu { + #[pin] + stream: S, + /// The number of FCUs to skip. + threshold: usize, + /// Current count of skipped FCUs. + skipped: usize, +} + +impl EngineSkipFcu { + /// Creates new [EngineSkipFcu] stream wrapper. + pub fn new(stream: S, threshold: usize) -> Self { + Self { + stream, + threshold, + // Start with `threshold` so that the first FCU goes through. + skipped: threshold, + } + } +} + +impl Stream for EngineSkipFcu +where + Engine: EngineTypes, + S: Stream>, +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + loop { + let next = ready!(this.stream.poll_next_unpin(cx)); + let item = match next { + Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) => { + if this.skipped < this.threshold { + *this.skipped += 1; + tracing::warn!(target: "engine::intercept", ?state, ?payload_attrs, threshold=this.threshold, skipped=this.skipped, "Skipping FCU"); + let _ = tx.send(Ok(OnForkChoiceUpdated::syncing())); + continue + } else { + *this.skipped = 0; + Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) + } + } + next => next, + }; + return Poll::Ready(item) + } + } +} diff --git a/crates/node-core/src/engine_skip_fcu.rs b/crates/node-core/src/engine_skip_fcu.rs deleted file mode 100644 index a6e5e1b01..000000000 --- a/crates/node-core/src/engine_skip_fcu.rs +++ /dev/null @@ -1,55 +0,0 @@ -//! Stores engine API messages to disk for later inspection and replay. - -use reth_beacon_consensus::{BeaconEngineMessage, OnForkChoiceUpdated}; -use reth_engine_primitives::EngineTypes; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; - -/// Intercept Engine API message and skip FCUs. -#[derive(Debug)] -pub struct EngineApiSkipFcu { - /// The number of FCUs to skip. - threshold: usize, - /// Current count of skipped FCUs. - skipped: usize, -} - -impl EngineApiSkipFcu { - /// Creates new [EngineApiSkipFcu] interceptor. - pub fn new(threshold: usize) -> Self { - Self { - threshold, - // Start with `threshold` so that the first FCU goes through. - skipped: threshold, - } - } - - /// Intercepts an incoming engine API message, skips FCU or forwards it - /// to the engine depending on current number of skipped FCUs. - pub async fn intercept( - mut self, - mut rx: UnboundedReceiver>, - to_engine: UnboundedSender>, - ) where - Engine: EngineTypes, - BeaconEngineMessage: std::fmt::Debug, - { - while let Some(msg) = rx.recv().await { - if let BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } = msg { - if self.skipped < self.threshold { - self.skipped += 1; - tracing::warn!(target: "engine::intercept", ?state, ?payload_attrs, threshold=self.threshold, skipped=self.skipped, "Skipping FCU"); - let _ = tx.send(Ok(OnForkChoiceUpdated::syncing())); - } else { - self.skipped = 0; - let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { - state, - payload_attrs, - tx, - }); - } - } else { - let _ = to_engine.send(msg); - } - } - } -} diff --git a/crates/node-core/src/lib.rs b/crates/node-core/src/lib.rs index 3d73e0e61..024467ab1 100644 --- a/crates/node-core/src/lib.rs +++ b/crates/node-core/src/lib.rs @@ -11,8 +11,7 @@ pub mod args; pub mod cli; pub mod dirs; -pub mod engine_api_store; -pub mod engine_skip_fcu; +pub mod engine; pub mod exit; pub mod init; pub mod metrics; diff --git a/crates/node/builder/Cargo.toml b/crates/node/builder/Cargo.toml index ef671f127..136c27d7c 100644 --- a/crates/node/builder/Cargo.toml +++ b/crates/node/builder/Cargo.toml @@ -47,6 +47,7 @@ tokio = { workspace = true, features = [ "time", "rt-multi-thread", ] } +tokio-stream.workspace = true ## misc aquamarine.workspace = true diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index bd81f8386..a372bedf0 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -14,7 +14,8 @@ use reth_beacon_consensus::{ BeaconConsensus, BeaconConsensusEngine, }; use reth_blockchain_tree::{ - BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals, + noop::NoopBlockchainTree, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, + TreeExternals, }; use reth_consensus::Consensus; use reth_exex::{ExExContext, ExExHandle, ExExManager, ExExManagerHandle}; @@ -23,8 +24,7 @@ use reth_network::NetworkEvents; use reth_node_api::{FullNodeComponents, FullNodeTypes}; use reth_node_core::{ dirs::{ChainPath, DataDirPath}, - engine_api_store::EngineApiStore, - engine_skip_fcu::EngineApiSkipFcu, + engine::EngineMessageStreamExt, exit::NodeExitFuture, }; use reth_node_events::{cl::ConsensusLayerHealthEvents, node}; @@ -37,10 +37,10 @@ use reth_tracing::tracing::{debug, info}; use reth_transaction_pool::TransactionPool; use std::{future::Future, sync::Arc}; use tokio::sync::{mpsc::unbounded_channel, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; pub mod common; pub use common::LaunchContext; -use reth_blockchain_tree::noop::NoopBlockchainTree; /// A general purpose trait that launches a new node of any kind. /// @@ -261,29 +261,15 @@ where // create pipeline let network_client = node_adapter.network().fetch_client().await?; - let (consensus_engine_tx, mut consensus_engine_rx) = unbounded_channel(); + let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); - if let Some(skip_fcu_threshold) = ctx.node_config().debug.skip_fcu { - debug!(target: "reth::cli", "spawning skip FCU task"); - let (skip_fcu_tx, skip_fcu_rx) = unbounded_channel(); - let engine_skip_fcu = EngineApiSkipFcu::new(skip_fcu_threshold); - ctx.task_executor().spawn_critical( - "skip FCU interceptor", - engine_skip_fcu.intercept(consensus_engine_rx, skip_fcu_tx), - ); - consensus_engine_rx = skip_fcu_rx; - } - - if let Some(store_path) = ctx.node_config().debug.engine_api_store.clone() { - debug!(target: "reth::cli", "spawning engine API store"); - let (engine_intercept_tx, engine_intercept_rx) = unbounded_channel(); - let engine_api_store = EngineApiStore::new(store_path); - ctx.task_executor().spawn_critical( - "engine api interceptor", - engine_api_store.intercept(consensus_engine_rx, engine_intercept_tx), - ); - consensus_engine_rx = engine_intercept_rx; - }; + let node_config = ctx.node_config(); + let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx) + .maybe_skip_fcu(node_config.debug.skip_fcu) + // Store messages _after_ skipping messages so that `replay-engine` command + // would replay the exact same messages that were observed by the engine + // during this run. + .maybe_store_messages(node_config.debug.engine_api_store.clone()); let max_block = ctx.max_block(network_client.clone()).await?; let mut hooks = EngineHooks::new(); @@ -303,8 +289,7 @@ where info!(target: "reth::cli", "Starting Reth in dev mode"); for (idx, (address, alloc)) in ctx.chain_spec().genesis.alloc.iter().enumerate() { - info!(target: "reth::cli", "Allocated Genesis Account: {:02}. {} ({} ETH)", idx, -address.to_string(), format_ether(alloc.balance)); + info!(target: "reth::cli", "Allocated Genesis Account: {:02}. {} ({} ETH)", idx, address.to_string(), format_ether(alloc.balance)); } // install auto-seal @@ -395,7 +380,7 @@ address.to_string(), format_ether(alloc.balance)); initial_target, reth_beacon_consensus::MIN_BLOCKS_FOR_PIPELINE_RUN, consensus_engine_tx, - consensus_engine_rx, + Box::pin(consensus_engine_stream), hooks, )?; info!(target: "reth::cli", "Consensus engine initialized");