mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
chore(engine): enable engine debug streams in new implementation (#10282)
This commit is contained in:
@ -30,7 +30,6 @@ reth-tasks.workspace = true
|
||||
# async
|
||||
futures.workspace = true
|
||||
pin-project.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
|
||||
# misc
|
||||
thiserror.workspace = true
|
||||
@ -46,3 +45,4 @@ reth-primitives.workspace = true
|
||||
reth-prune-types.workspace = true
|
||||
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
tokio-stream.workspace = true
|
||||
|
||||
@ -30,13 +30,15 @@ use std::{
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
/// Alias for consensus engine stream.
|
||||
type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
|
||||
|
||||
/// Alias for chain orchestrator.
|
||||
type EngineServiceType<DB, Client, T> = ChainOrchestrator<
|
||||
EngineHandler<
|
||||
EngineApiRequestHandler<EngineApiRequest<T>>,
|
||||
UnboundedReceiverStream<BeaconEngineMessage<T>>,
|
||||
EngineMessageStream<T>,
|
||||
BasicBlockDownloader<Client>,
|
||||
>,
|
||||
PipelineSync<DB>,
|
||||
@ -70,7 +72,7 @@ where
|
||||
executor_factory: E,
|
||||
chain_spec: Arc<ChainSpec>,
|
||||
client: Client,
|
||||
incoming_requests: UnboundedReceiverStream<BeaconEngineMessage<T>>,
|
||||
incoming_requests: EngineMessageStream<T>,
|
||||
pipeline: Pipeline<DB>,
|
||||
pipeline_task_spawner: Box<dyn TaskSpawner>,
|
||||
provider: ProviderFactory<DB>,
|
||||
@ -149,6 +151,7 @@ mod tests {
|
||||
use reth_tasks::TokioTaskExecutor;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc::unbounded_channel, watch};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
#[test]
|
||||
fn eth_chain_orchestrator_build() {
|
||||
@ -185,7 +188,7 @@ mod tests {
|
||||
executor_factory,
|
||||
chain_spec,
|
||||
client,
|
||||
incoming_requests,
|
||||
Box::pin(incoming_requests),
|
||||
pipeline,
|
||||
pipeline_task_spawner,
|
||||
provider_factory,
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
//! Stream wrapper that simulates reorgs.
|
||||
|
||||
use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
|
||||
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
|
||||
use itertools::Either;
|
||||
use reth_beacon_consensus::{BeaconEngineMessage, BeaconOnNewPayloadError, OnForkChoiceUpdated};
|
||||
use reth_engine_primitives::EngineTypes;
|
||||
@ -26,6 +26,7 @@ use reth_rpc_types_compat::engine::payload::block_to_payload;
|
||||
use revm_primitives::{BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg};
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
@ -43,6 +44,8 @@ type EngineReorgResponse = Result<
|
||||
oneshot::error::RecvError,
|
||||
>;
|
||||
|
||||
type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;
|
||||
|
||||
/// Engine API stream wrapper that simulates reorgs with specified frequency.
|
||||
#[derive(Debug)]
|
||||
#[pin_project::pin_project]
|
||||
@ -66,7 +69,7 @@ pub struct EngineReorg<S, Engine: EngineTypes, Provider, Evm> {
|
||||
/// Last forkchoice state.
|
||||
last_forkchoice_state: Option<ForkchoiceState>,
|
||||
/// Pending engine responses to reorg messages.
|
||||
reorg_responses: FuturesUnordered<BoxFuture<'static, EngineReorgResponse>>,
|
||||
reorg_responses: FuturesUnordered<ReorgResponseFut>,
|
||||
}
|
||||
|
||||
impl<S, Engine: EngineTypes, Provider, Evm> EngineReorg<S, Engine, Provider, Evm> {
|
||||
@ -181,10 +184,8 @@ where
|
||||
let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
|
||||
let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
|
||||
this.reorg_responses.extend([
|
||||
Box::pin(reorg_payload_rx.map_ok(Either::Left))
|
||||
as BoxFuture<'static, EngineReorgResponse>,
|
||||
Box::pin(reorg_fcu_rx.map_ok(Either::Right))
|
||||
as BoxFuture<'static, EngineReorgResponse>,
|
||||
Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
|
||||
Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
|
||||
]);
|
||||
|
||||
*this.state = EngineReorgState::Reorg {
|
||||
|
||||
@ -20,6 +20,7 @@ use reth_engine_tree::{
|
||||
engine::{EngineApiRequest, EngineRequestHandler},
|
||||
tree::TreeConfig,
|
||||
};
|
||||
use reth_engine_util::EngineMessageStreamExt;
|
||||
use reth_exex::ExExManagerHandle;
|
||||
use reth_network::{NetworkSyncUpdater, SyncState};
|
||||
use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider};
|
||||
@ -133,6 +134,21 @@ where
|
||||
let network_client = ctx.components().network().fetch_client().await?;
|
||||
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
|
||||
|
||||
let node_config = ctx.node_config();
|
||||
let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
|
||||
.maybe_skip_fcu(node_config.debug.skip_fcu)
|
||||
.maybe_skip_new_payload(node_config.debug.skip_new_payload)
|
||||
.maybe_reorg(
|
||||
ctx.blockchain_db().clone(),
|
||||
ctx.components().evm_config().clone(),
|
||||
reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()),
|
||||
node_config.debug.reorg_frequency,
|
||||
)
|
||||
// Store messages _after_ skipping so that `replay-engine` command
|
||||
// would replay only the messages that were observed by the engine
|
||||
// during this run.
|
||||
.maybe_store_messages(node_config.debug.engine_api_store.clone());
|
||||
|
||||
let max_block = ctx.max_block(network_client.clone()).await?;
|
||||
let mut hooks = EngineHooks::new();
|
||||
|
||||
@ -179,7 +195,7 @@ where
|
||||
ctx.components().block_executor().clone(),
|
||||
ctx.chain_spec(),
|
||||
network_client.clone(),
|
||||
UnboundedReceiverStream::new(consensus_engine_rx),
|
||||
Box::pin(consensus_engine_stream),
|
||||
pipeline,
|
||||
Box::new(ctx.task_executor().clone()),
|
||||
ctx.provider_factory().clone(),
|
||||
|
||||
Reference in New Issue
Block a user