feat: make engine block downloaders generic over block (#13273)

This commit is contained in:
Arsenii Kulikov
2024-12-10 19:38:37 +04:00
committed by GitHub
parent 4c39b98b62
commit 88a9bd72d4
8 changed files with 74 additions and 60 deletions

View File

@ -64,7 +64,7 @@ where
/// Constructor for [`LocalEngineService`].
#[allow(clippy::too_many_arguments)]
pub fn new<B, V>(
consensus: Arc<dyn FullConsensus>,
consensus: Arc<dyn FullConsensus<N::Primitives>>,
executor_factory: impl BlockExecutorProvider<Primitives = N::Primitives>,
provider: ProviderFactory<N>,
blockchain_db: BlockchainProvider2<N>,
@ -122,7 +122,7 @@ impl<N> Stream for LocalEngineService<N>
where
N: EngineNodeTypes,
{
type Item = ChainEvent<BeaconConsensusEngineEvent>;
type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

View File

@ -16,8 +16,8 @@ pub use reth_engine_tree::{
engine::EngineApiEvent,
};
use reth_evm::execute::BlockExecutorProvider;
use reth_network_p2p::EthBlockClient;
use reth_node_types::{BlockTy, NodeTypes, NodeTypesWithEngine};
use reth_network_p2p::BlockClient;
use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, NodeTypesWithEngine};
use reth_payload_builder::PayloadBuilderHandle;
use reth_primitives::EthPrimitives;
use reth_provider::{providers::BlockchainProvider2, ProviderFactory};
@ -42,7 +42,7 @@ type EngineServiceType<N, Client> = ChainOrchestrator<
<N as NodeTypes>::Primitives,
>,
EngineMessageStream<<N as NodeTypesWithEngine>::Engine>,
BasicBlockDownloader<Client>,
BasicBlockDownloader<Client, BlockTy<N>>,
>,
PipelineSync<N>,
>;
@ -53,7 +53,7 @@ type EngineServiceType<N, Client> = ChainOrchestrator<
pub struct EngineService<N, Client, E>
where
N: EngineNodeTypes,
Client: EthBlockClient + 'static,
Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
E: BlockExecutorProvider + 'static,
{
orchestrator: EngineServiceType<N, Client>,
@ -63,13 +63,13 @@ where
impl<N, Client, E> EngineService<N, Client, E>
where
N: EngineNodeTypes,
Client: EthBlockClient + 'static,
Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
E: BlockExecutorProvider<Primitives = N::Primitives> + 'static,
{
/// Constructor for `EngineService`.
#[allow(clippy::too_many_arguments)]
pub fn new<V>(
consensus: Arc<dyn FullConsensus>,
consensus: Arc<dyn FullConsensus<N::Primitives>>,
executor_factory: E,
chain_spec: Arc<N::ChainSpec>,
client: Client,
@ -131,10 +131,10 @@ where
impl<N, Client, E> Stream for EngineService<N, Client, E>
where
N: EngineNodeTypes,
Client: EthBlockClient + 'static,
Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
E: BlockExecutorProvider + 'static,
{
type Item = ChainEvent<BeaconConsensusEngineEvent>;
type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut orchestrator = self.project().orchestrator;

View File

@ -1,14 +1,16 @@
//! Handler that can download blocks on demand (e.g. from the network).
use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics};
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use futures::FutureExt;
use reth_consensus::Consensus;
use reth_network_p2p::{
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
BlockClient, EthBlockClient,
BlockClient,
};
use reth_primitives::{SealedBlock, SealedBlockWithSenders};
use reth_primitives::{SealedBlockFor, SealedBlockWithSenders};
use reth_primitives_traits::Block;
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque},
@ -20,11 +22,14 @@ use tracing::trace;
/// A trait that can download blocks on demand.
pub trait BlockDownloader: Send + Sync {
/// Type of the block being downloaded.
type Block: Block;
/// Handle an action.
fn on_action(&mut self, action: DownloadAction);
/// Advance in progress requests if any
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome>;
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome<Self::Block>>;
}
/// Actions that can be performed by the block downloader.
@ -38,9 +43,9 @@ pub enum DownloadAction {
/// Outcome of downloaded blocks.
#[derive(Debug)]
pub enum DownloadOutcome {
pub enum DownloadOutcome<B: Block> {
/// Downloaded blocks.
Blocks(Vec<SealedBlockWithSenders>),
Blocks(Vec<SealedBlockWithSenders<B>>),
/// New download started.
NewDownloadStarted {
/// How many blocks are pending in this download.
@ -52,7 +57,7 @@ pub enum DownloadOutcome {
/// Basic [`BlockDownloader`].
#[allow(missing_debug_implementations)]
pub struct BasicBlockDownloader<Client>
pub struct BasicBlockDownloader<Client, B: Block>
where
Client: BlockClient + 'static,
{
@ -64,16 +69,17 @@ where
inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
/// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
/// ordering. This means the blocks will be popped from the heap with ascending block numbers.
set_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlockWithSenders>>,
set_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlockWithSenders<B>>>,
/// Engine download metrics.
metrics: BlockDownloaderMetrics,
/// Pending events to be emitted.
pending_events: VecDeque<DownloadOutcome>,
pending_events: VecDeque<DownloadOutcome<B>>,
}
impl<Client> BasicBlockDownloader<Client>
impl<Client, B> BasicBlockDownloader<Client, B>
where
Client: EthBlockClient + 'static,
Client: BlockClient<Header = B::Header, Body = B::Body> + 'static,
B: Block,
{
/// Create a new instance
pub fn new(
@ -174,20 +180,23 @@ where
}
/// Adds a pending event to the FIFO queue.
fn push_pending_event(&mut self, pending_event: DownloadOutcome) {
fn push_pending_event(&mut self, pending_event: DownloadOutcome<B>) {
self.pending_events.push_back(pending_event);
}
/// Removes a pending event from the FIFO queue.
fn pop_pending_event(&mut self) -> Option<DownloadOutcome> {
fn pop_pending_event(&mut self) -> Option<DownloadOutcome<B>> {
self.pending_events.pop_front()
}
}
impl<Client> BlockDownloader for BasicBlockDownloader<Client>
impl<Client, B> BlockDownloader for BasicBlockDownloader<Client, B>
where
Client: EthBlockClient,
Client: BlockClient<Header = B::Header, Body = B::Body>,
B: Block,
{
type Block = B;
/// Handles incoming download actions.
fn on_action(&mut self, action: DownloadAction) {
match action {
@ -197,7 +206,7 @@ where
}
/// Advances the download process.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome> {
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome<B>> {
if let Some(pending_event) = self.pop_pending_event() {
return Poll::Ready(pending_event);
}
@ -244,7 +253,7 @@ where
}
// drain all unique element of the block buffer if there are any
let mut downloaded_blocks: Vec<SealedBlockWithSenders> =
let mut downloaded_blocks: Vec<SealedBlockWithSenders<B>> =
Vec::with_capacity(self.set_buffered_blocks.len());
while let Some(block) = self.set_buffered_blocks.pop() {
// peek ahead and pop duplicates
@ -264,29 +273,29 @@ where
/// A wrapper type around [`SealedBlockWithSenders`] that implements the [Ord]
/// trait by block number.
#[derive(Debug, Clone, PartialEq, Eq)]
struct OrderedSealedBlockWithSenders(SealedBlockWithSenders);
struct OrderedSealedBlockWithSenders<B: Block>(SealedBlockWithSenders<B>);
impl PartialOrd for OrderedSealedBlockWithSenders {
impl<B: Block> PartialOrd for OrderedSealedBlockWithSenders<B> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for OrderedSealedBlockWithSenders {
impl<B: Block> Ord for OrderedSealedBlockWithSenders<B> {
fn cmp(&self, other: &Self) -> Ordering {
self.0.number.cmp(&other.0.number)
self.0.number().cmp(&other.0.number())
}
}
impl From<SealedBlock> for OrderedSealedBlockWithSenders {
fn from(block: SealedBlock) -> Self {
impl<B: Block> From<SealedBlockFor<B>> for OrderedSealedBlockWithSenders<B> {
fn from(block: SealedBlockFor<B>) -> Self {
let senders = block.senders().unwrap_or_default();
Self(SealedBlockWithSenders { block, senders })
}
}
impl From<OrderedSealedBlockWithSenders> for SealedBlockWithSenders {
fn from(value: OrderedSealedBlockWithSenders) -> Self {
impl<B: Block> From<OrderedSealedBlockWithSenders<B>> for SealedBlockWithSenders<B> {
fn from(value: OrderedSealedBlockWithSenders<B>) -> Self {
let senders = value.0.senders;
Self { block: value.0.block, senders }
}
@ -295,12 +304,14 @@ impl From<OrderedSealedBlockWithSenders> for SealedBlockWithSenders {
/// A [`BlockDownloader`] that does nothing.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct NoopBlockDownloader;
pub struct NoopBlockDownloader<B>(core::marker::PhantomData<B>);
impl<B: Block> BlockDownloader for NoopBlockDownloader<B> {
type Block = B;
impl BlockDownloader for NoopBlockDownloader {
fn on_action(&mut self, _event: DownloadAction) {}
fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<DownloadOutcome> {
fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<DownloadOutcome<B>> {
Poll::Pending
}
}
@ -318,7 +329,7 @@ mod tests {
use std::{future::poll_fn, sync::Arc};
struct TestHarness {
block_downloader: BasicBlockDownloader<TestFullBlockClient>,
block_downloader: BasicBlockDownloader<TestFullBlockClient, reth_primitives::Block>,
client: TestFullBlockClient,
}
@ -385,7 +396,7 @@ mod tests {
// ensure they are in ascending order
for num in 1..=TOTAL_BLOCKS {
assert_eq!(blocks[num-1].number, num as u64);
assert_eq!(blocks[num-1].number(), num as u64);
}
});
}
@ -423,7 +434,7 @@ mod tests {
// ensure they are in ascending order
for num in 1..=TOTAL_BLOCKS {
assert_eq!(blocks[num-1].number, num as u64);
assert_eq!(blocks[num-1].number(), num as u64);
}
});
}

View File

@ -67,7 +67,7 @@ impl<T, S, D> EngineHandler<T, S, D> {
impl<T, S, D> ChainHandler for EngineHandler<T, S, D>
where
T: EngineRequestHandler<Block = reth_primitives::Block>,
T: EngineRequestHandler<Block = D::Block>,
S: Stream + Send + Sync + Unpin + 'static,
<S as Stream>::Item: Into<T::Request>,
D: BlockDownloader,