chore: move EventSender out of BeaconConsensusEngineHandle (#13533)

This commit is contained in:
Arsenii Kulikov
2024-12-24 02:44:19 +04:00
committed by GitHub
parent 6822d4f18b
commit c6d42ad8c4
5 changed files with 8 additions and 23 deletions

View File

@ -1,6 +1,6 @@
//! `BeaconConsensusEngine` external API //! `BeaconConsensusEngine` external API
use crate::{BeaconConsensusEngineEvent, BeaconForkChoiceUpdateError}; use crate::BeaconForkChoiceUpdateError;
use alloy_rpc_types_engine::{ use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, ForkchoiceUpdated, PayloadStatus, ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
}; };
@ -10,7 +10,6 @@ use reth_engine_primitives::{
OnForkChoiceUpdated, OnForkChoiceUpdated,
}; };
use reth_errors::RethResult; use reth_errors::RethResult;
use reth_tokio_util::{EventSender, EventStream};
use tokio::sync::{mpsc::UnboundedSender, oneshot}; use tokio::sync::{mpsc::UnboundedSender, oneshot};
/// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus /// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus
@ -23,7 +22,6 @@ where
Engine: EngineTypes, Engine: EngineTypes,
{ {
pub(crate) to_engine: UnboundedSender<BeaconEngineMessage<Engine>>, pub(crate) to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
event_sender: EventSender<BeaconConsensusEngineEvent>,
} }
// === impl BeaconConsensusEngineHandle === // === impl BeaconConsensusEngineHandle ===
@ -33,11 +31,8 @@ where
Engine: EngineTypes, Engine: EngineTypes,
{ {
/// Creates a new beacon consensus engine handle. /// Creates a new beacon consensus engine handle.
pub const fn new( pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Engine>>) -> Self {
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>, Self { to_engine }
event_sender: EventSender<BeaconConsensusEngineEvent>,
) -> Self {
Self { to_engine, event_sender }
} }
/// Sends a new payload message to the beacon consensus engine and waits for a response. /// Sends a new payload message to the beacon consensus engine and waits for a response.
@ -96,9 +91,4 @@ where
pub fn transition_configuration_exchanged(&self) { pub fn transition_configuration_exchanged(&self) {
let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged); let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
} }
/// Creates a new [`BeaconConsensusEngineEvent`] listener stream.
pub fn event_listener(&self) -> EventStream<BeaconConsensusEngineEvent> {
self.event_sender.new_listener()
}
} }

View File

@ -299,7 +299,7 @@ where
hooks: EngineHooks, hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<N::Engine>)> { ) -> RethResult<(Self, BeaconConsensusEngineHandle<N::Engine>)> {
let event_sender = EventSender::default(); let event_sender = EventSender::default();
let handle = BeaconConsensusEngineHandle::new(to_engine, event_sender.clone()); let handle = BeaconConsensusEngineHandle::new(to_engine);
let sync = EngineSyncController::new( let sync = EngineSyncController::new(
pipeline, pipeline,
client, client,

View File

@ -213,8 +213,7 @@ where
info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized"); info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
let event_sender = EventSender::default(); let event_sender = EventSender::default();
let beacon_engine_handle = let beacon_engine_handle = BeaconConsensusEngineHandle::new(consensus_engine_tx.clone());
BeaconConsensusEngineHandle::new(consensus_engine_tx.clone(), event_sender.clone());
// extract the jwt secret from the args if possible // extract the jwt secret from the args if possible
let jwt_secret = ctx.auth_jwt_secret()?; let jwt_secret = ctx.auth_jwt_secret()?;
@ -271,7 +270,7 @@ where
info!(target: "reth::cli", "Consensus engine initialized"); info!(target: "reth::cli", "Consensus engine initialized");
let events = stream_select!( let events = stream_select!(
beacon_engine_handle.event_listener().map(Into::into), event_sender.new_listener().map(Into::into),
pipeline_events.map(Into::into), pipeline_events.map(Into::into),
if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
Either::Left( Either::Left(

View File

@ -37,8 +37,7 @@ pub const fn test_address() -> SocketAddr {
pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle { pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle {
let config = AuthServerConfig::builder(secret).socket_addr(test_address()).build(); let config = AuthServerConfig::builder(secret).socket_addr(test_address()).build();
let (tx, _rx) = unbounded_channel(); let (tx, _rx) = unbounded_channel();
let beacon_engine_handle = let beacon_engine_handle = BeaconConsensusEngineHandle::<EthEngineTypes>::new(tx);
BeaconConsensusEngineHandle::<EthEngineTypes>::new(tx, Default::default());
let client = ClientVersionV1 { let client = ClientVersionV1 {
code: ClientCode::RH, code: ClientCode::RH,
name: "Reth".to_string(), name: "Reth".to_string(),

View File

@ -1031,7 +1031,6 @@ mod tests {
use super::*; use super::*;
use alloy_rpc_types_engine::{ClientCode, ClientVersionV1}; use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
use assert_matches::assert_matches; use assert_matches::assert_matches;
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_chainspec::{ChainSpec, MAINNET}; use reth_chainspec::{ChainSpec, MAINNET};
use reth_engine_primitives::BeaconEngineMessage; use reth_engine_primitives::BeaconEngineMessage;
use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator}; use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator};
@ -1041,7 +1040,6 @@ mod tests {
use reth_rpc_types_compat::engine::payload::execution_payload_from_sealed_block; use reth_rpc_types_compat::engine::payload::execution_payload_from_sealed_block;
use reth_tasks::TokioTaskExecutor; use reth_tasks::TokioTaskExecutor;
use reth_testing_utils::generators::random_block; use reth_testing_utils::generators::random_block;
use reth_tokio_util::EventSender;
use reth_transaction_pool::noop::NoopTransactionPool; use reth_transaction_pool::noop::NoopTransactionPool;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
@ -1066,12 +1064,11 @@ mod tests {
let provider = Arc::new(MockEthProvider::default()); let provider = Arc::new(MockEthProvider::default());
let payload_store = spawn_test_payload_service(); let payload_store = spawn_test_payload_service();
let (to_engine, engine_rx) = unbounded_channel(); let (to_engine, engine_rx) = unbounded_channel();
let event_sender: EventSender<BeaconConsensusEngineEvent> = Default::default();
let task_executor = Box::<TokioTaskExecutor>::default(); let task_executor = Box::<TokioTaskExecutor>::default();
let api = EngineApi::new( let api = EngineApi::new(
provider.clone(), provider.clone(),
chain_spec.clone(), chain_spec.clone(),
BeaconConsensusEngineHandle::new(to_engine, event_sender), BeaconConsensusEngineHandle::new(to_engine),
payload_store.into(), payload_store.into(),
NoopTransactionPool::default(), NoopTransactionPool::default(),
task_executor, task_executor,