From b34f23d880f786065654b122e18dc59dc93e0199 Mon Sep 17 00:00:00 2001 From: Tuan Tran Date: Wed, 11 Dec 2024 16:36:37 +0700 Subject: [PATCH] chore: Generic data prims EngineSyncController (#13037) Co-authored-by: Matthias Seitz --- crates/consensus/beacon/src/engine/sync.rs | 27 ++++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index 861aeebf1..735441b2e 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -9,8 +9,9 @@ use alloy_primitives::{BlockNumber, B256}; use futures::FutureExt; use reth_network_p2p::{ full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient}, - EthBlockClient, + BlockClient, }; +use reth_node_types::{BodyTy, HeaderTy}; use reth_primitives::{BlockBody, EthPrimitives, NodePrimitives, SealedBlock}; use reth_provider::providers::ProviderNodeTypes; use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult}; @@ -35,7 +36,7 @@ use tracing::trace; pub(crate) struct EngineSyncController where N: ProviderNodeTypes, - Client: EthBlockClient, + Client: BlockClient, { /// A downloader that can download full blocks from the network. full_block_client: FullBlockClient, @@ -51,10 +52,10 @@ where /// In-flight full block _range_ requests in progress. inflight_block_range_requests: Vec>, /// Sender for engine events. - event_sender: EventSender, + event_sender: EventSender>, /// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for /// ordering. This means the blocks will be popped from the heap with ascending block numbers. - range_buffered_blocks: BinaryHeap>, + range_buffered_blocks: BinaryHeap, BodyTy>>>, /// Max block after which the consensus engine would terminate the sync. Used for debugging /// purposes. max_block: Option, @@ -65,7 +66,7 @@ where impl EngineSyncController where N: ProviderNodeTypes, - Client: EthBlockClient + 'static, + Client: BlockClient, { /// Create a new instance pub(crate) fn new( @@ -74,7 +75,7 @@ where pipeline_task_spawner: Box, max_block: Option, chain_spec: Arc, - event_sender: EventSender, + event_sender: EventSender>, ) -> Self { Self { full_block_client: FullBlockClient::new( @@ -92,7 +93,13 @@ where metrics: EngineSyncMetrics::default(), } } +} +impl EngineSyncController +where + N: ProviderNodeTypes, + Client: BlockClient
, Body = BodyTy> + 'static, +{ /// Sets the metrics for the active downloads fn update_block_download_metrics(&self) { self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64); @@ -234,7 +241,7 @@ where /// Advances the pipeline state. /// /// This checks for the result in the channel, or returns pending if the pipeline is idle. - fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll { + fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll> { let res = match self.pipeline_state { PipelineState::Idle(_) => return Poll::Pending, PipelineState::Running(ref mut fut) => { @@ -259,7 +266,7 @@ where /// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to /// run continuously. - fn try_spawn_pipeline(&mut self) -> Option { + fn try_spawn_pipeline(&mut self) -> Option> { match &mut self.pipeline_state { PipelineState::Idle(pipeline) => { let target = self.pending_pipeline_target.take()?; @@ -286,7 +293,7 @@ where } /// Advances the sync process. - pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { // try to spawn a pipeline if a target is set if let Some(event) = self.try_spawn_pipeline() { return Poll::Ready(event) @@ -423,7 +430,7 @@ mod tests { use assert_matches::assert_matches; use futures::poll; use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET}; - use reth_network_p2p::{either::Either, test_utils::TestFullBlockClient}; + use reth_network_p2p::{either::Either, test_utils::TestFullBlockClient, EthBlockClient}; use reth_primitives::{BlockBody, SealedHeader}; use reth_provider::{ test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},