feat: make EngineApiRequestHandler generic over request (#10147)

This commit is contained in:
Matthias Seitz
2024-08-06 23:27:38 +02:00
committed by GitHub
parent 5ca4921198
commit f8104cc93f
2 changed files with 17 additions and 15 deletions

View File

@ -6,8 +6,7 @@ use crate::{
download::{BlockDownloader, DownloadAction, DownloadOutcome}, download::{BlockDownloader, DownloadAction, DownloadOutcome},
}; };
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage}; use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_engine_primitives::EngineTypes;
use reth_primitives::{SealedBlockWithSenders, B256}; use reth_primitives::{SealedBlockWithSenders, B256};
use std::{ use std::{
collections::HashSet, collections::HashSet,
@ -54,12 +53,18 @@ impl<T, S, D> EngineHandler<T, S, D> {
{ {
Self { handler, incoming_requests, downloader } Self { handler, incoming_requests, downloader }
} }
/// Returns a mutable reference to the request handler.
pub fn handler_mut(&mut self) -> &mut T {
&mut self.handler
}
} }
impl<T, S, D> ChainHandler for EngineHandler<T, S, D> impl<T, S, D> ChainHandler for EngineHandler<T, S, D>
where where
T: EngineRequestHandler, T: EngineRequestHandler,
S: Stream<Item = T::Request> + Send + Sync + Unpin + 'static, S: Stream + Send + Sync + Unpin + 'static,
<S as Stream>::Item: Into<T::Request>,
D: BlockDownloader, D: BlockDownloader,
{ {
type Event = T::Event; type Event = T::Event;
@ -98,7 +103,7 @@ where
// pop the next incoming request // pop the next incoming request
if let Poll::Ready(Some(req)) = self.incoming_requests.poll_next_unpin(cx) { if let Poll::Ready(Some(req)) = self.incoming_requests.poll_next_unpin(cx) {
// and delegate the request to the handler // and delegate the request to the handler
self.handler.on_event(FromEngine::Request(req)); self.handler.on_event(FromEngine::Request(req.into()));
// skip downloading in this iteration to allow the handler to process the request // skip downloading in this iteration to allow the handler to process the request
continue continue
} }
@ -156,32 +161,29 @@ pub trait EngineRequestHandler: Send + Sync {
/// In case required blocks are missing, the handler will request them from the network, by emitting /// In case required blocks are missing, the handler will request them from the network, by emitting
/// a download request upstream. /// a download request upstream.
#[derive(Debug)] #[derive(Debug)]
pub struct EngineApiRequestHandler<T: EngineTypes> { pub struct EngineApiRequestHandler<Request> {
/// channel to send messages to the tree to execute the payload. /// channel to send messages to the tree to execute the payload.
to_tree: Sender<FromEngine<BeaconEngineMessage<T>>>, to_tree: Sender<FromEngine<Request>>,
/// channel to receive messages from the tree. /// channel to receive messages from the tree.
from_tree: UnboundedReceiver<EngineApiEvent>, from_tree: UnboundedReceiver<EngineApiEvent>,
} }
impl<T> EngineApiRequestHandler<T> impl<Request> EngineApiRequestHandler<Request> {
where
T: EngineTypes,
{
/// Creates a new `EngineApiRequestHandler`. /// Creates a new `EngineApiRequestHandler`.
pub const fn new( pub const fn new(
to_tree: Sender<FromEngine<BeaconEngineMessage<T>>>, to_tree: Sender<FromEngine<Request>>,
from_tree: UnboundedReceiver<EngineApiEvent>, from_tree: UnboundedReceiver<EngineApiEvent>,
) -> Self { ) -> Self {
Self { to_tree, from_tree } Self { to_tree, from_tree }
} }
} }
impl<T> EngineRequestHandler for EngineApiRequestHandler<T> impl<Request> EngineRequestHandler for EngineApiRequestHandler<Request>
where where
T: EngineTypes, Request: Send,
{ {
type Event = BeaconConsensusEngineEvent; type Event = BeaconConsensusEngineEvent;
type Request = BeaconEngineMessage<T>; type Request = Request;
fn on_event(&mut self, event: FromEngine<Self::Request>) { fn on_event(&mut self, event: FromEngine<Self::Request>) {
// delegate to the tree // delegate to the tree

View File

@ -33,7 +33,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
/// Alias for Ethereum chain orchestrator. /// Alias for Ethereum chain orchestrator.
type EthServiceType<DB, Client> = ChainOrchestrator< type EthServiceType<DB, Client> = ChainOrchestrator<
EngineHandler< EngineHandler<
EngineApiRequestHandler<EthEngineTypes>, EngineApiRequestHandler<BeaconEngineMessage<EthEngineTypes>>,
UnboundedReceiverStream<BeaconEngineMessage<EthEngineTypes>>, UnboundedReceiverStream<BeaconEngineMessage<EthEngineTypes>>,
BasicBlockDownloader<Client>, BasicBlockDownloader<Client>,
>, >,