refactor: engine interceptors (#8048)

This commit is contained in:
Roman Krasiuk
2024-05-02 17:17:28 +02:00
committed by GitHub
parent 1603113ce5
commit e68ab2f58c
11 changed files with 220 additions and 133 deletions

3
Cargo.lock generated
View File

@ -7196,6 +7196,7 @@ dependencies = [
"reth-transaction-pool",
"tempfile",
"tokio",
"tokio-stream",
]
[[package]]
@ -7218,6 +7219,7 @@ dependencies = [
"metrics-process",
"metrics-util",
"once_cell",
"pin-project",
"procfs",
"proptest",
"rand 0.8.5",
@ -7256,6 +7258,7 @@ dependencies = [
"thiserror",
"tikv-jemalloc-ctl",
"tokio",
"tokio-util",
"tracing",
"vergen",
]

View File

@ -19,7 +19,7 @@ use reth_consensus::Consensus;
use reth_db::{init_db, DatabaseEnv};
use reth_network::NetworkHandle;
use reth_network_api::NetworkInfo;
use reth_node_core::engine_api_store::{EngineApiStore, StoredEngineApiMessage};
use reth_node_core::engine::engine_store::{EngineMessageStore, StoredEngineApiMessage};
#[cfg(not(feature = "optimism"))]
use reth_node_ethereum::{EthEngineTypes, EthEvmConfig};
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
@ -34,7 +34,7 @@ use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_transaction_pool::noop::NoopTransactionPool;
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;
use tracing::*;
/// `reth debug replay-engine` command
@ -191,8 +191,7 @@ impl Command {
// Configure the consensus engine
let network_client = network.fetch_client().await?;
let (consensus_engine_tx, consensus_engine_rx) = mpsc::unbounded_channel();
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::new(
network_client,
Pipeline::builder().build(
provider_factory.clone(),
@ -210,8 +209,6 @@ impl Command {
payload_builder,
None,
u64::MAX,
consensus_engine_tx,
consensus_engine_rx,
EngineHooks::new(),
)?;
info!(target: "reth::cli", "Consensus engine initialized");
@ -224,7 +221,7 @@ impl Command {
let _ = tx.send(res);
});
let engine_api_store = EngineApiStore::new(self.engine_api_store.clone());
let engine_api_store = EngineMessageStore::new(self.engine_api_store.clone());
for filepath in engine_api_store.engine_messages_iter()? {
let contents =
fs::read(&filepath).wrap_err(format!("failed to read: {}", filepath.display()))?;

View File

@ -1,18 +1,10 @@
use crate::{
engine::{
forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker},
metrics::EngineMetrics,
},
hooks::{EngineHookContext, EngineHooksController},
sync::{EngineSyncController, EngineSyncEvent},
};
use futures::{Future, StreamExt};
use futures::{stream::BoxStream, Future, StreamExt};
use reth_db::database::Database;
use reth_engine_primitives::{EngineTypes, PayloadAttributes, PayloadBuilderAttributes};
use reth_interfaces::{
blockchain_tree::{
error::{BlockchainTreeError, CanonicalError, InsertBlockError, InsertBlockErrorKind},
BlockStatus, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk,
BlockStatus, BlockValidationKind, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk,
},
executor::BlockValidationError,
p2p::{bodies::client::BodiesClient, headers::client::HeadersClient},
@ -21,6 +13,7 @@ use reth_interfaces::{
RethError, RethResult,
};
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_validator::ExecutionPayloadValidator;
use reth_primitives::{
constants::EPOCH_SLOTS, stage::StageId, BlockNumHash, BlockNumber, Head, Header, SealedBlock,
SealedHeader, B256,
@ -43,7 +36,7 @@ use std::{
time::{Duration, Instant},
};
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
mpsc::{self, UnboundedSender},
oneshot,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
@ -68,18 +61,19 @@ mod handle;
pub use handle::BeaconConsensusEngineHandle;
mod forkchoice;
use crate::hooks::{EngineHookEvent, EngineHooks, PolledHook};
pub use forkchoice::ForkchoiceStatus;
use reth_interfaces::blockchain_tree::BlockValidationKind;
use reth_payload_validator::ExecutionPayloadValidator;
use forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker};
mod metrics;
use metrics::EngineMetrics;
pub(crate) mod sync;
use sync::{EngineSyncController, EngineSyncEvent};
/// Hooks for running during the main loop of
/// [consensus engine][`crate::engine::BeaconConsensusEngine`].
pub mod hooks;
use hooks::{EngineHookContext, EngineHookEvent, EngineHooks, EngineHooksController, PolledHook};
#[cfg(test)]
pub mod test_utils;
@ -180,7 +174,7 @@ where
/// Used for emitting updates about whether the engine is syncing or not.
sync_state_updater: Box<dyn NetworkSyncUpdater>,
/// The Engine API message receiver.
engine_message_rx: UnboundedReceiverStream<BeaconEngineMessage<EngineT>>,
engine_message_stream: BoxStream<'static, BeaconEngineMessage<EngineT>>,
/// A clone of the handle
handle: BeaconConsensusEngineHandle<EngineT>,
/// Tracks the received forkchoice state updates received by the CL.
@ -254,7 +248,7 @@ where
target,
pipeline_run_threshold,
to_engine,
rx,
Box::pin(UnboundedReceiverStream::from(rx)),
hooks,
)
}
@ -284,7 +278,7 @@ where
target: Option<B256>,
pipeline_run_threshold: u64,
to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
rx: UnboundedReceiver<BeaconEngineMessage<EngineT>>,
engine_message_stream: BoxStream<'static, BeaconEngineMessage<EngineT>>,
hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<EngineT>)> {
let handle = BeaconConsensusEngineHandle { to_engine };
@ -303,7 +297,7 @@ where
payload_validator: ExecutionPayloadValidator::new(blockchain.chain_spec()),
blockchain,
sync_state_updater,
engine_message_rx: UnboundedReceiverStream::new(rx),
engine_message_stream,
handle: handle.clone(),
forkchoice_state_tracker: Default::default(),
payload_builder,
@ -1770,7 +1764,7 @@ where
//
// These messages can affect the state of the SyncController and they're also time
// sensitive, hence they are polled first.
if let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) {
if let Poll::Ready(Some(msg)) = this.engine_message_stream.poll_next_unpin(cx) {
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
this.on_forkchoice_updated(state, payload_attrs, tx);

View File

@ -44,6 +44,8 @@ discv5.workspace = true
# async
tokio.workspace = true
tokio-util.workspace = true
pin-project.workspace = true
# metrics
metrics-exporter-prometheus = "0.12.1"

View File

@ -1,5 +1,6 @@
//! Stores engine API messages to disk for later inspection and replay.
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_engine_primitives::EngineTypes;
use reth_primitives::fs;
@ -8,8 +9,13 @@ use reth_rpc_types::{
ExecutionPayload,
};
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, path::PathBuf, time::SystemTime};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use std::{
collections::BTreeMap,
path::PathBuf,
pin::Pin,
task::{ready, Context, Poll},
time::SystemTime,
};
use tracing::*;
/// A message from the engine API that has been stored to disk.
@ -34,13 +40,13 @@ pub enum StoredEngineApiMessage<Attributes> {
/// This can read and write engine API messages in a specific directory.
#[derive(Debug)]
pub struct EngineApiStore {
pub struct EngineMessageStore {
/// The path to the directory that stores the engine API messages.
path: PathBuf,
}
impl EngineApiStore {
/// Creates a new [EngineApiStore] at the given path.
impl EngineMessageStore {
/// Creates a new [EngineMessageStore] at the given path.
///
/// The path is expected to be a directory, where individual message JSON files will be stored.
pub fn new(path: PathBuf) -> Self {
@ -108,22 +114,42 @@ impl EngineApiStore {
}
Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
}
}
/// Intercepts an incoming engine API message, storing it to disk and forwarding it to the
/// engine channel.
pub async fn intercept<Engine>(
self,
mut rx: UnboundedReceiver<BeaconEngineMessage<Engine>>,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
) where
Engine: EngineTypes,
BeaconEngineMessage<Engine>: std::fmt::Debug,
{
while let Some(msg) = rx.recv().await {
if let Err(error) = self.on_message(&msg, SystemTime::now()) {
error!(target: "engine::intercept", ?msg, %error, "Error handling Engine API message");
}
let _ = to_engine.send(msg);
}
/// A wrapper stream that stores Engine API messages in
/// the specified directory.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EngineStoreStream<S> {
/// Inner message stream.
#[pin]
stream: S,
/// Engine message store.
store: EngineMessageStore,
}
impl<S> EngineStoreStream<S> {
/// Create new engine store stream wrapper.
pub fn new(stream: S, path: PathBuf) -> Self {
Self { stream, store: EngineMessageStore::new(path) }
}
}
impl<Engine, S> Stream for EngineStoreStream<S>
where
Engine: EngineTypes,
S: Stream<Item = BeaconEngineMessage<Engine>>,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let next = ready!(this.stream.poll_next_unpin(cx));
if let Some(msg) = &next {
if let Err(error) = this.store.on_message(msg, SystemTime::now()) {
error!(target: "engine::intercept", ?msg, %error, "Error handling Engine API message");
}
}
Poll::Ready(next)
}
}

View File

@ -0,0 +1,71 @@
//! Collection of various stream utilities for consensus engine.
use futures::Stream;
use reth_beacon_consensus::BeaconEngineMessage;
use reth_engine_primitives::EngineTypes;
use std::path::PathBuf;
use tokio_util::either::Either;
pub mod engine_store;
use engine_store::EngineStoreStream;
pub mod skip_fcu;
use skip_fcu::EngineSkipFcu;
/// The collection of stream extensions for engine API message stream.
pub trait EngineMessageStreamExt<Engine: EngineTypes>:
Stream<Item = BeaconEngineMessage<Engine>>
{
/// Skips the specified number of [BeaconEngineMessage::ForkchoiceUpdated] messages from the
/// engine message stream.
fn skip_fcu(self, count: usize) -> EngineSkipFcu<Self>
where
Self: Sized,
{
EngineSkipFcu::new(self, count)
}
/// If the count is [Some], returns the stream that skips the specified number of
/// [BeaconEngineMessage::ForkchoiceUpdated] messages. Otherwise, returns `Self`.
fn maybe_skip_fcu(self, maybe_count: Option<usize>) -> Either<EngineSkipFcu<Self>, Self>
where
Self: Sized,
{
if let Some(count) = maybe_count {
Either::Left(self.skip_fcu(count))
} else {
Either::Right(self)
}
}
/// Stores engine messages at the specified location.
fn store_messages(self, path: PathBuf) -> EngineStoreStream<Self>
where
Self: Sized,
{
EngineStoreStream::new(self, path)
}
/// If the path is [Some], returns the stream that stores engine messages at the specified
/// location. Otherwise, returns `Self`.
fn maybe_store_messages(
self,
maybe_path: Option<PathBuf>,
) -> Either<EngineStoreStream<Self>, Self>
where
Self: Sized,
{
if let Some(path) = maybe_path {
Either::Left(self.store_messages(path))
} else {
Either::Right(self)
}
}
}
impl<Engine, T> EngineMessageStreamExt<Engine> for T
where
Engine: EngineTypes,
T: Stream<Item = BeaconEngineMessage<Engine>>,
{
}

View File

@ -0,0 +1,64 @@
//! Stores engine API messages to disk for later inspection and replay.
use futures::{Stream, StreamExt};
use reth_beacon_consensus::{BeaconEngineMessage, OnForkChoiceUpdated};
use reth_engine_primitives::EngineTypes;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
/// Engine API stream wrapper that skips the specified number of forkchoice updated messages.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EngineSkipFcu<S> {
#[pin]
stream: S,
/// The number of FCUs to skip.
threshold: usize,
/// Current count of skipped FCUs.
skipped: usize,
}
impl<S> EngineSkipFcu<S> {
/// Creates new [EngineSkipFcu] stream wrapper.
pub fn new(stream: S, threshold: usize) -> Self {
Self {
stream,
threshold,
// Start with `threshold` so that the first FCU goes through.
skipped: threshold,
}
}
}
impl<Engine, S> Stream for EngineSkipFcu<S>
where
Engine: EngineTypes,
S: Stream<Item = BeaconEngineMessage<Engine>>,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let next = ready!(this.stream.poll_next_unpin(cx));
let item = match next {
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) => {
if this.skipped < this.threshold {
*this.skipped += 1;
tracing::warn!(target: "engine::intercept", ?state, ?payload_attrs, threshold=this.threshold, skipped=this.skipped, "Skipping FCU");
let _ = tx.send(Ok(OnForkChoiceUpdated::syncing()));
continue
} else {
*this.skipped = 0;
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
}
}
next => next,
};
return Poll::Ready(item)
}
}
}

View File

@ -1,55 +0,0 @@
//! Stores engine API messages to disk for later inspection and replay.
use reth_beacon_consensus::{BeaconEngineMessage, OnForkChoiceUpdated};
use reth_engine_primitives::EngineTypes;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
/// Intercept Engine API message and skip FCUs.
#[derive(Debug)]
pub struct EngineApiSkipFcu {
/// The number of FCUs to skip.
threshold: usize,
/// Current count of skipped FCUs.
skipped: usize,
}
impl EngineApiSkipFcu {
/// Creates new [EngineApiSkipFcu] interceptor.
pub fn new(threshold: usize) -> Self {
Self {
threshold,
// Start with `threshold` so that the first FCU goes through.
skipped: threshold,
}
}
/// Intercepts an incoming engine API message, skips FCU or forwards it
/// to the engine depending on current number of skipped FCUs.
pub async fn intercept<Engine>(
mut self,
mut rx: UnboundedReceiver<BeaconEngineMessage<Engine>>,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
) where
Engine: EngineTypes,
BeaconEngineMessage<Engine>: std::fmt::Debug,
{
while let Some(msg) = rx.recv().await {
if let BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } = msg {
if self.skipped < self.threshold {
self.skipped += 1;
tracing::warn!(target: "engine::intercept", ?state, ?payload_attrs, threshold=self.threshold, skipped=self.skipped, "Skipping FCU");
let _ = tx.send(Ok(OnForkChoiceUpdated::syncing()));
} else {
self.skipped = 0;
let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
});
}
} else {
let _ = to_engine.send(msg);
}
}
}
}

View File

@ -11,8 +11,7 @@
pub mod args;
pub mod cli;
pub mod dirs;
pub mod engine_api_store;
pub mod engine_skip_fcu;
pub mod engine;
pub mod exit;
pub mod init;
pub mod metrics;

View File

@ -47,6 +47,7 @@ tokio = { workspace = true, features = [
"time",
"rt-multi-thread",
] }
tokio-stream.workspace = true
## misc
aquamarine.workspace = true

View File

@ -14,7 +14,8 @@ use reth_beacon_consensus::{
BeaconConsensus, BeaconConsensusEngine,
};
use reth_blockchain_tree::{
BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals,
noop::NoopBlockchainTree, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree,
TreeExternals,
};
use reth_consensus::Consensus;
use reth_exex::{ExExContext, ExExHandle, ExExManager, ExExManagerHandle};
@ -23,8 +24,7 @@ use reth_network::NetworkEvents;
use reth_node_api::{FullNodeComponents, FullNodeTypes};
use reth_node_core::{
dirs::{ChainPath, DataDirPath},
engine_api_store::EngineApiStore,
engine_skip_fcu::EngineApiSkipFcu,
engine::EngineMessageStreamExt,
exit::NodeExitFuture,
};
use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
@ -37,10 +37,10 @@ use reth_tracing::tracing::{debug, info};
use reth_transaction_pool::TransactionPool;
use std::{future::Future, sync::Arc};
use tokio::sync::{mpsc::unbounded_channel, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
pub mod common;
pub use common::LaunchContext;
use reth_blockchain_tree::noop::NoopBlockchainTree;
/// A general purpose trait that launches a new node of any kind.
///
@ -261,29 +261,15 @@ where
// create pipeline
let network_client = node_adapter.network().fetch_client().await?;
let (consensus_engine_tx, mut consensus_engine_rx) = unbounded_channel();
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
if let Some(skip_fcu_threshold) = ctx.node_config().debug.skip_fcu {
debug!(target: "reth::cli", "spawning skip FCU task");
let (skip_fcu_tx, skip_fcu_rx) = unbounded_channel();
let engine_skip_fcu = EngineApiSkipFcu::new(skip_fcu_threshold);
ctx.task_executor().spawn_critical(
"skip FCU interceptor",
engine_skip_fcu.intercept(consensus_engine_rx, skip_fcu_tx),
);
consensus_engine_rx = skip_fcu_rx;
}
if let Some(store_path) = ctx.node_config().debug.engine_api_store.clone() {
debug!(target: "reth::cli", "spawning engine API store");
let (engine_intercept_tx, engine_intercept_rx) = unbounded_channel();
let engine_api_store = EngineApiStore::new(store_path);
ctx.task_executor().spawn_critical(
"engine api interceptor",
engine_api_store.intercept(consensus_engine_rx, engine_intercept_tx),
);
consensus_engine_rx = engine_intercept_rx;
};
let node_config = ctx.node_config();
let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
.maybe_skip_fcu(node_config.debug.skip_fcu)
// Store messages _after_ skipping messages so that `replay-engine` command
// would replay the exact same messages that were observed by the engine
// during this run.
.maybe_store_messages(node_config.debug.engine_api_store.clone());
let max_block = ctx.max_block(network_client.clone()).await?;
let mut hooks = EngineHooks::new();
@ -303,8 +289,7 @@ where
info!(target: "reth::cli", "Starting Reth in dev mode");
for (idx, (address, alloc)) in ctx.chain_spec().genesis.alloc.iter().enumerate() {
info!(target: "reth::cli", "Allocated Genesis Account: {:02}. {} ({} ETH)", idx,
address.to_string(), format_ether(alloc.balance));
info!(target: "reth::cli", "Allocated Genesis Account: {:02}. {} ({} ETH)", idx, address.to_string(), format_ether(alloc.balance));
}
// install auto-seal
@ -395,7 +380,7 @@ address.to_string(), format_ether(alloc.balance));
initial_target,
reth_beacon_consensus::MIN_BLOCKS_FOR_PIPELINE_RUN,
consensus_engine_tx,
consensus_engine_rx,
Box::pin(consensus_engine_stream),
hooks,
)?;
info!(target: "reth::cli", "Consensus engine initialized");