From aece53ae88990bb8d0849b60e9649f68781dd03e Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Tue, 12 Nov 2024 19:13:21 +0400 Subject: [PATCH] feat: make downloaders and clients generic over block parts (#12469) Co-authored-by: Matthias Seitz --- Cargo.lock | 6 +- bin/reth/Cargo.toml | 1 + bin/reth/src/commands/debug_cmd/execution.rs | 11 +- .../commands/debug_cmd/in_memory_merkle.rs | 4 +- crates/consensus/beacon/src/engine/mod.rs | 8 +- crates/consensus/beacon/src/engine/sync.rs | 8 +- .../consensus/beacon/src/engine/test_utils.rs | 12 +- crates/engine/service/src/service.rs | 8 +- crates/engine/tree/src/download.rs | 12 +- crates/net/downloaders/Cargo.toml | 1 + crates/net/downloaders/src/bodies/bodies.rs | 48 +++--- crates/net/downloaders/src/bodies/noop.rs | 5 +- crates/net/downloaders/src/bodies/queue.rs | 7 +- crates/net/downloaders/src/bodies/request.rs | 20 ++- crates/net/downloaders/src/bodies/task.rs | 30 ++-- crates/net/downloaders/src/file_client.rs | 2 + crates/net/downloaders/src/headers/noop.rs | 5 +- .../src/headers/reverse_headers.rs | 161 ++++++++++-------- crates/net/downloaders/src/headers/task.rs | 40 +++-- .../src/test_utils/bodies_client.rs | 1 + crates/net/eth-wire-types/Cargo.toml | 1 + crates/net/eth-wire-types/src/message.rs | 2 +- crates/net/eth-wire-types/src/primitives.rs | 4 +- crates/net/eth-wire/src/capability.rs | 11 +- crates/net/eth-wire/src/ethstream.rs | 102 ++++++----- crates/net/eth-wire/src/multiplex.rs | 16 +- crates/net/network-api/src/downloaders.rs | 7 +- crates/net/network-api/src/events.rs | 69 ++++++-- crates/net/network-api/src/lib.rs | 5 +- crates/net/network/Cargo.toml | 1 + crates/net/network/src/fetch/client.rs | 18 +- crates/net/network/src/fetch/mod.rs | 54 +++--- crates/net/network/src/message.rs | 27 +-- crates/net/network/src/network.rs | 9 +- crates/net/network/src/state.rs | 29 ++-- crates/net/p2p/Cargo.toml | 4 +- crates/net/p2p/src/bodies/client.rs | 13 +- crates/net/p2p/src/bodies/downloader.rs | 9 +- crates/net/p2p/src/bodies/response.rs | 11 +- crates/net/p2p/src/either.rs | 6 +- crates/net/p2p/src/error.rs | 9 +- crates/net/p2p/src/full_block.rs | 92 +++++----- crates/net/p2p/src/headers/client.rs | 10 +- crates/net/p2p/src/headers/downloader.rs | 25 ++- crates/net/p2p/src/headers/error.rs | 8 +- crates/net/p2p/src/lib.rs | 11 ++ crates/net/p2p/src/test_utils/bodies.rs | 1 + crates/net/p2p/src/test_utils/full_block.rs | 4 + crates/net/p2p/src/test_utils/headers.rs | 5 +- crates/node/builder/src/launch/common.rs | 2 +- crates/node/builder/src/setup.rs | 8 +- crates/node/core/Cargo.toml | 3 +- crates/node/core/src/node_config.rs | 11 +- crates/node/core/src/utils.rs | 21 ++- crates/primitives-traits/src/block/header.rs | 6 +- crates/primitives-traits/src/header/sealed.rs | 4 +- crates/primitives-traits/src/size.rs | 6 + crates/primitives/src/block.rs | 3 +- crates/stages/stages/src/stages/bodies.rs | 9 +- crates/stages/stages/src/stages/headers.rs | 14 +- 60 files changed, 631 insertions(+), 409 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 798852f80..4792cb094 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6391,6 +6391,7 @@ dependencies = [ "reth-payload-primitives", "reth-payload-validator", "reth-primitives", + "reth-primitives-traits", "reth-provider", "reth-prune", "reth-revm", @@ -7029,6 +7030,7 @@ dependencies = [ name = "reth-downloaders" version = "1.1.1" dependencies = [ + "alloy-consensus", "alloy-eips", "alloy-primitives", "alloy-rlp", @@ -7829,6 +7831,7 @@ dependencies = [ name = "reth-network-p2p" version = "1.1.1" dependencies = [ + "alloy-consensus", "alloy-eips", "alloy-primitives", "auto_impl", @@ -7997,7 +8000,7 @@ dependencies = [ "reth-chainspec", "reth-cli-util", "reth-config", - "reth-consensus-common", + "reth-consensus", "reth-db", "reth-discv4", "reth-discv5", @@ -8006,6 +8009,7 @@ dependencies = [ "reth-network-p2p", "reth-network-peers", "reth-primitives", + "reth-primitives-traits", "reth-prune-types", "reth-rpc-eth-types", "reth-rpc-server-types", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index ffd1998b2..a152bea26 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -19,6 +19,7 @@ reth-ethereum-cli.workspace = true reth-chainspec.workspace = true reth-config.workspace = true reth-primitives.workspace = true +reth-primitives-traits.workspace = true reth-fs-util.workspace = true reth-db = { workspace = true, features = ["mdbx"] } reth-db-api.workspace = true diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index cc584c892..9056d3424 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -21,7 +21,7 @@ use reth_downloaders::{ use reth_exex::ExExManagerHandle; use reth_network::{BlockDownloaderProvider, NetworkEventListenerProvider, NetworkHandle}; use reth_network_api::NetworkInfo; -use reth_network_p2p::{headers::client::HeadersClient, BlockClient}; +use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient}; use reth_node_api::{NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine}; use reth_node_ethereum::EthExecutorProvider; use reth_provider::{ @@ -68,7 +68,7 @@ impl> Command { static_file_producer: StaticFileProducer>, ) -> eyre::Result> where - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, { // building network downloaders using the fetch client let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers) @@ -137,11 +137,14 @@ impl> Command { Ok(network) } - async fn fetch_block_hash( + async fn fetch_block_hash( &self, client: Client, block: BlockNumber, - ) -> eyre::Result { + ) -> eyre::Result + where + Client: HeadersClient, + { info!(target: "reth::cli", ?block, "Fetching block from the network."); loop { match get_single_header(&client, BlockHashOrNumber::Number(block)).await { diff --git a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs index 2c56da9b4..bc36578a3 100644 --- a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs +++ b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs @@ -7,6 +7,7 @@ use crate::{ use alloy_eips::BlockHashOrNumber; use backon::{ConstantBuilder, Retryable}; use clap::Parser; +use reth_beacon_consensus::EthBeaconConsensus; use reth_chainspec::ChainSpec; use reth_cli::chainspec::ChainSpecParser; use reth_cli_commands::common::{AccessRights, Environment, EnvironmentArgs}; @@ -124,7 +125,8 @@ impl> Command { let client = fetch_client.clone(); let chain = provider_factory.chain_spec(); - let block = (move || get_single_body(client.clone(), Arc::clone(&chain), header.clone())) + let consensus = Arc::new(EthBeaconConsensus::new(chain.clone())); + let block = (move || get_single_body(client.clone(), header.clone(), consensus.clone())) .retry(backoff) .notify( |err, _| warn!(target: "reth::cli", "Error requesting body: {err}. Retrying..."), diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 65904196e..8d385a64b 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -14,7 +14,7 @@ use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes, PayloadTypes} use reth_errors::{BlockValidationError, ProviderResult, RethError, RethResult}; use reth_network_p2p::{ sync::{NetworkSyncUpdater, SyncState}, - BlockClient, + EthBlockClient, }; use reth_node_types::NodeTypesWithEngine; use reth_payload_builder::PayloadBuilderHandle; @@ -174,7 +174,7 @@ type PendingForkchoiceUpdate = pub struct BeaconConsensusEngine where N: EngineNodeTypes, - Client: BlockClient, + Client: EthBlockClient, BT: BlockchainTreeEngine + BlockReader + BlockIdReader @@ -237,7 +237,7 @@ where + StageCheckpointReader + ChainSpecProvider + 'static, - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, { /// Create a new instance of the [`BeaconConsensusEngine`]. #[allow(clippy::too_many_arguments)] @@ -1799,7 +1799,7 @@ where impl Future for BeaconConsensusEngine where N: EngineNodeTypes, - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, BT: BlockchainTreeEngine + BlockReader + BlockIdReader diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index 9426ca197..17d5d2281 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -8,7 +8,7 @@ use alloy_primitives::{BlockNumber, B256}; use futures::FutureExt; use reth_network_p2p::{ full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient}, - BlockClient, + EthBlockClient, }; use reth_primitives::SealedBlock; use reth_provider::providers::ProviderNodeTypes; @@ -34,7 +34,7 @@ use tracing::trace; pub(crate) struct EngineSyncController where N: ProviderNodeTypes, - Client: BlockClient, + Client: EthBlockClient, { /// A downloader that can download full blocks from the network. full_block_client: FullBlockClient, @@ -64,7 +64,7 @@ where impl EngineSyncController where N: ProviderNodeTypes, - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, { /// Create a new instance pub(crate) fn new( @@ -522,7 +522,7 @@ mod tests { ) -> EngineSyncController> where N: ProviderNodeTypes, - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, { let client = self .client diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index 6e03aebfa..3c69e7f55 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -24,7 +24,9 @@ use reth_ethereum_engine_primitives::EthEngineTypes; use reth_evm::{either::Either, test_utils::MockExecutorProvider}; use reth_evm_ethereum::execute::EthExecutorProvider; use reth_exex_types::FinishedExExHeight; -use reth_network_p2p::{sync::NoopSyncStateUpdater, test_utils::NoopFullBlockClient, BlockClient}; +use reth_network_p2p::{ + sync::NoopSyncStateUpdater, test_utils::NoopFullBlockClient, EthBlockClient, +}; use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_primitives::SealedHeader; use reth_provider::{ @@ -237,7 +239,7 @@ impl TestConsensusEngineBuilder { client: Client, ) -> NetworkedTestConsensusEngineBuilder where - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, { NetworkedTestConsensusEngineBuilder { base_config: self, client: Some(client) } } @@ -264,7 +266,7 @@ pub struct NetworkedTestConsensusEngineBuilder { impl NetworkedTestConsensusEngineBuilder where - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, { /// Set the pipeline execution outputs to use for the test consensus engine. #[allow(dead_code)] @@ -319,7 +321,7 @@ where client: ClientType, ) -> NetworkedTestConsensusEngineBuilder where - ClientType: BlockClient + 'static, + ClientType: EthBlockClient + 'static, { NetworkedTestConsensusEngineBuilder { base_config: self.base_config, client: Some(client) } } @@ -450,7 +452,7 @@ pub fn spawn_consensus_engine( engine: TestBeaconConsensusEngine, ) -> oneshot::Receiver> where - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, { let (tx, rx) = oneshot::channel(); tokio::spawn(async move { diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index 198438d45..d383af6ca 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -15,7 +15,7 @@ pub use reth_engine_tree::{ engine::EngineApiEvent, }; use reth_evm::execute::BlockExecutorProvider; -use reth_network_p2p::BlockClient; +use reth_network_p2p::EthBlockClient; use reth_node_types::NodeTypesWithEngine; use reth_payload_builder::PayloadBuilderHandle; use reth_payload_validator::ExecutionPayloadValidator; @@ -49,7 +49,7 @@ type EngineServiceType = ChainOrchestrator< pub struct EngineService where N: EngineNodeTypes, - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, E: BlockExecutorProvider + 'static, { orchestrator: EngineServiceType, @@ -59,7 +59,7 @@ where impl EngineService where N: EngineNodeTypes, - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, E: BlockExecutorProvider + 'static, { /// Constructor for `EngineService`. @@ -124,7 +124,7 @@ where impl Stream for EngineService where N: EngineNodeTypes, - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, E: BlockExecutorProvider + 'static, { type Item = ChainEvent; diff --git a/crates/engine/tree/src/download.rs b/crates/engine/tree/src/download.rs index 9ecec70ae..667808a4d 100644 --- a/crates/engine/tree/src/download.rs +++ b/crates/engine/tree/src/download.rs @@ -6,12 +6,13 @@ use futures::FutureExt; use reth_consensus::Consensus; use reth_network_p2p::{ full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient}, - BlockClient, + BlockClient, EthBlockClient, }; use reth_primitives::{SealedBlock, SealedBlockWithSenders}; use std::{ cmp::{Ordering, Reverse}, collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque}, + fmt::Debug, sync::Arc, task::{Context, Poll}, }; @@ -72,10 +73,13 @@ where impl BasicBlockDownloader where - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, { /// Create a new instance - pub fn new(client: Client, consensus: Arc) -> Self { + pub fn new( + client: Client, + consensus: Arc>, + ) -> Self { Self { full_block_client: FullBlockClient::new(client, consensus), inflight_full_block_requests: Vec::new(), @@ -182,7 +186,7 @@ where impl BlockDownloader for BasicBlockDownloader where - Client: BlockClient + 'static, + Client: EthBlockClient, { /// Handles incoming download actions. fn on_action(&mut self, action: DownloadAction) { diff --git a/crates/net/downloaders/Cargo.toml b/crates/net/downloaders/Cargo.toml index 69a59f698..38e46bb60 100644 --- a/crates/net/downloaders/Cargo.toml +++ b/crates/net/downloaders/Cargo.toml @@ -28,6 +28,7 @@ reth-db-api = { workspace = true, optional = true } reth-testing-utils = { workspace = true, optional = true } # ethereum +alloy-consensus.workspace = true alloy-eips.workspace = true alloy-primitives.workspace = true alloy-rlp.workspace = true diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index af113fdb3..02a36c8e8 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -37,7 +37,7 @@ pub struct BodiesDownloader { /// The bodies client client: Arc, /// The consensus client - consensus: Arc, + consensus: Arc>, /// The database handle provider: Provider, /// The maximum number of non-empty blocks per one request @@ -57,16 +57,16 @@ pub struct BodiesDownloader { /// Requests in progress in_progress_queue: BodiesRequestQueue, /// Buffered responses - buffered_responses: BinaryHeap, + buffered_responses: BinaryHeap>, /// Queued body responses that can be returned for insertion into the database. - queued_bodies: Vec, + queued_bodies: Vec>, /// The bodies downloader metrics. metrics: BodyDownloaderMetrics, } impl BodiesDownloader where - B: BodiesClient + 'static, + B: BodiesClient + 'static, Provider: HeaderProvider + Unpin + 'static, { /// Returns the next contiguous request. @@ -191,14 +191,14 @@ where } /// Queues bodies and sets the latest queued block number - fn queue_bodies(&mut self, bodies: Vec) { + fn queue_bodies(&mut self, bodies: Vec>) { self.latest_queued_block_number = Some(bodies.last().expect("is not empty").block_number()); self.queued_bodies.extend(bodies); self.metrics.queued_blocks.set(self.queued_bodies.len() as f64); } /// Removes the next response from the buffer. - fn pop_buffered_response(&mut self) -> Option { + fn pop_buffered_response(&mut self) -> Option> { let resp = self.buffered_responses.pop()?; self.metrics.buffered_responses.decrement(1.); self.buffered_blocks_size_bytes -= resp.size(); @@ -208,10 +208,10 @@ where } /// Adds a new response to the internal buffer - fn buffer_bodies_response(&mut self, response: Vec) { + fn buffer_bodies_response(&mut self, response: Vec>) { // take into account capacity let size = response.iter().map(BlockResponse::size).sum::() + - response.capacity() * mem::size_of::(); + response.capacity() * mem::size_of::>(); let response = OrderedBodiesResponse { resp: response, size }; let response_len = response.len(); @@ -225,7 +225,7 @@ where } /// Returns a response if it's first block number matches the next expected. - fn try_next_buffered(&mut self) -> Option> { + fn try_next_buffered(&mut self) -> Option>> { if let Some(next) = self.buffered_responses.peek() { let expected = self.next_expected_block_number(); let next_block_range = next.block_range(); @@ -251,7 +251,7 @@ where /// Returns the next batch of block bodies that can be returned if we have enough buffered /// bodies - fn try_split_next_batch(&mut self) -> Option> { + fn try_split_next_batch(&mut self) -> Option>> { if self.queued_bodies.len() >= self.stream_batch_size { let next_batch = self.queued_bodies.drain(..self.stream_batch_size).collect::>(); self.queued_bodies.shrink_to_fit(); @@ -283,12 +283,12 @@ where Self: BodyDownloader + 'static, { /// Spawns the downloader task via [`tokio::task::spawn`] - pub fn into_task(self) -> TaskDownloader { + pub fn into_task(self) -> TaskDownloader<::Body> { self.into_task_with(&TokioTaskExecutor::default()) } /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given spawner. - pub fn into_task_with(self, spawner: &S) -> TaskDownloader + pub fn into_task_with(self, spawner: &S) -> TaskDownloader<::Body> where S: TaskSpawner, { @@ -298,9 +298,11 @@ where impl BodyDownloader for BodiesDownloader where - B: BodiesClient + 'static, + B: BodiesClient + 'static, Provider: HeaderProvider + Unpin + 'static, { + type Body = B::Body; + /// Set a new download range (exclusive). /// /// This method will drain all queued bodies, filter out ones outside the range and put them @@ -346,10 +348,10 @@ where impl Stream for BodiesDownloader where - B: BodiesClient + 'static, + B: BodiesClient + 'static, Provider: HeaderProvider + Unpin + 'static, { - type Item = BodyDownloaderResult; + type Item = BodyDownloaderResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -431,13 +433,13 @@ where } #[derive(Debug)] -struct OrderedBodiesResponse { - resp: Vec, +struct OrderedBodiesResponse { + resp: Vec>, /// The total size of the response in bytes size: usize, } -impl OrderedBodiesResponse { +impl OrderedBodiesResponse { /// Returns the block number of the first element /// /// # Panics @@ -468,21 +470,21 @@ impl OrderedBodiesResponse { } } -impl PartialEq for OrderedBodiesResponse { +impl PartialEq for OrderedBodiesResponse { fn eq(&self, other: &Self) -> bool { self.first_block_number() == other.first_block_number() } } -impl Eq for OrderedBodiesResponse {} +impl Eq for OrderedBodiesResponse {} -impl PartialOrd for OrderedBodiesResponse { +impl PartialOrd for OrderedBodiesResponse { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Ord for OrderedBodiesResponse { +impl Ord for OrderedBodiesResponse { fn cmp(&self, other: &Self) -> Ordering { self.first_block_number().cmp(&other.first_block_number()).reverse() } @@ -562,7 +564,7 @@ impl BodiesDownloaderBuilder { pub fn build( self, client: B, - consensus: Arc, + consensus: Arc>, provider: Provider, ) -> BodiesDownloader where diff --git a/crates/net/downloaders/src/bodies/noop.rs b/crates/net/downloaders/src/bodies/noop.rs index e70c534a0..494a5f2ef 100644 --- a/crates/net/downloaders/src/bodies/noop.rs +++ b/crates/net/downloaders/src/bodies/noop.rs @@ -4,6 +4,7 @@ use reth_network_p2p::{ bodies::{downloader::BodyDownloader, response::BlockResponse}, error::{DownloadError, DownloadResult}, }; +use reth_primitives::BlockBody; use std::ops::RangeInclusive; /// A [`BodyDownloader`] implementation that does nothing. @@ -12,13 +13,15 @@ use std::ops::RangeInclusive; pub struct NoopBodiesDownloader; impl BodyDownloader for NoopBodiesDownloader { + type Body = BlockBody; + fn set_download_range(&mut self, _: RangeInclusive) -> DownloadResult<()> { Ok(()) } } impl Stream for NoopBodiesDownloader { - type Item = Result, DownloadError>; + type Item = Result>, DownloadError>; fn poll_next( self: std::pin::Pin<&mut Self>, diff --git a/crates/net/downloaders/src/bodies/queue.rs b/crates/net/downloaders/src/bodies/queue.rs index db7ff71cf..54404d0da 100644 --- a/crates/net/downloaders/src/bodies/queue.rs +++ b/crates/net/downloaders/src/bodies/queue.rs @@ -9,6 +9,7 @@ use reth_network_p2p::{ error::DownloadResult, }; use reth_primitives::SealedHeader; +use reth_primitives_traits::InMemorySize; use std::{ pin::Pin, sync::Arc, @@ -57,7 +58,7 @@ where pub(crate) fn push_new_request( &mut self, client: Arc, - consensus: Arc, + consensus: Arc>, request: Vec, ) { // Set last max requested block number @@ -77,9 +78,9 @@ where impl Stream for BodiesRequestQueue where - B: BodiesClient + 'static, + B: BodiesClient + 'static, { - type Item = DownloadResult>; + type Item = DownloadResult>>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.get_mut().inner.poll_next_unpin(cx) diff --git a/crates/net/downloaders/src/bodies/request.rs b/crates/net/downloaders/src/bodies/request.rs index 5ab44ed08..7b99c81d8 100644 --- a/crates/net/downloaders/src/bodies/request.rs +++ b/crates/net/downloaders/src/bodies/request.rs @@ -39,7 +39,7 @@ use std::{ /// and eventually disconnected. pub(crate) struct BodiesRequestFuture { client: Arc, - consensus: Arc, + consensus: Arc>, metrics: BodyDownloaderMetrics, /// Metrics for individual responses. This can be used to observe how the size (in bytes) of /// responses change while bodies are being downloaded. @@ -47,7 +47,7 @@ pub(crate) struct BodiesRequestFuture { // Headers to download. The collection is shrunk as responses are buffered. pending_headers: VecDeque, /// Internal buffer for all blocks - buffer: Vec, + buffer: Vec>, fut: Option, /// Tracks how many bodies we requested in the last request. last_request_len: Option, @@ -60,7 +60,7 @@ where /// Returns an empty future. Use [`BodiesRequestFuture::with_headers`] to set the request. pub(crate) fn new( client: Arc, - consensus: Arc, + consensus: Arc>, metrics: BodyDownloaderMetrics, ) -> Self { Self { @@ -115,7 +115,10 @@ where /// Process block response. /// Returns an error if the response is invalid. - fn on_block_response(&mut self, response: WithPeerId>) -> DownloadResult<()> { + fn on_block_response(&mut self, response: WithPeerId>) -> DownloadResult<()> + where + B::Body: InMemorySize, + { let (peer_id, bodies) = response.split(); let request_len = self.last_request_len.unwrap_or_default(); let response_len = bodies.len(); @@ -158,7 +161,10 @@ where /// /// This method removes headers from the internal collection. /// If the response fails validation, then the header will be put back. - fn try_buffer_blocks(&mut self, bodies: Vec) -> DownloadResult<()> { + fn try_buffer_blocks(&mut self, bodies: Vec) -> DownloadResult<()> + where + B::Body: InMemorySize, + { let bodies_capacity = bodies.capacity(); let bodies_len = bodies.len(); let mut bodies = bodies.into_iter().peekable(); @@ -208,9 +214,9 @@ where impl Future for BodiesRequestFuture where - B: BodiesClient + 'static, + B: BodiesClient + 'static, { - type Output = DownloadResult>; + type Output = DownloadResult>>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); diff --git a/crates/net/downloaders/src/bodies/task.rs b/crates/net/downloaders/src/bodies/task.rs index eeafb7ab1..2caf31991 100644 --- a/crates/net/downloaders/src/bodies/task.rs +++ b/crates/net/downloaders/src/bodies/task.rs @@ -23,15 +23,15 @@ pub const BODIES_TASK_BUFFER_SIZE: usize = 4; /// A [BodyDownloader] that drives a spawned [BodyDownloader] on a spawned task. #[derive(Debug)] #[pin_project] -pub struct TaskDownloader { +pub struct TaskDownloader { #[pin] - from_downloader: ReceiverStream, + from_downloader: ReceiverStream>, to_downloader: UnboundedSender>, } // === impl TaskDownloader === -impl TaskDownloader { +impl TaskDownloader { /// Spawns the given `downloader` via [`tokio::task::spawn`] returns a [`TaskDownloader`] that's /// connected to that task. /// @@ -45,12 +45,16 @@ impl TaskDownloader { /// use reth_consensus::Consensus; /// use reth_downloaders::bodies::{bodies::BodiesDownloaderBuilder, task::TaskDownloader}; /// use reth_network_p2p::bodies::client::BodiesClient; + /// use reth_primitives_traits::InMemorySize; /// use reth_storage_api::HeaderProvider; /// use std::sync::Arc; /// - /// fn t( + /// fn t< + /// B: BodiesClient + 'static, + /// Provider: HeaderProvider + Unpin + 'static, + /// >( /// client: Arc, - /// consensus: Arc, + /// consensus: Arc>, /// provider: Provider, /// ) { /// let downloader = BodiesDownloaderBuilder::default().build(client, consensus, provider); @@ -59,7 +63,7 @@ impl TaskDownloader { /// ``` pub fn spawn(downloader: T) -> Self where - T: BodyDownloader + 'static, + T: BodyDownloader + 'static, { Self::spawn_with(downloader, &TokioTaskExecutor::default()) } @@ -68,7 +72,7 @@ impl TaskDownloader { /// that's connected to that task. pub fn spawn_with(downloader: T, spawner: &S) -> Self where - T: BodyDownloader + 'static, + T: BodyDownloader + 'static, S: TaskSpawner, { let (bodies_tx, bodies_rx) = mpsc::channel(BODIES_TASK_BUFFER_SIZE); @@ -86,15 +90,17 @@ impl TaskDownloader { } } -impl BodyDownloader for TaskDownloader { +impl BodyDownloader for TaskDownloader { + type Body = B; + fn set_download_range(&mut self, range: RangeInclusive) -> DownloadResult<()> { let _ = self.to_downloader.send(range); Ok(()) } } -impl Stream for TaskDownloader { - type Item = BodyDownloaderResult; +impl Stream for TaskDownloader { + type Item = BodyDownloaderResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().from_downloader.poll_next(cx) @@ -102,9 +108,9 @@ impl Stream for TaskDownloader { } /// A [`BodyDownloader`] that runs on its own task -struct SpawnedDownloader { +struct SpawnedDownloader { updates: UnboundedReceiverStream>, - bodies_tx: PollSender, + bodies_tx: PollSender>, downloader: T, } diff --git a/crates/net/downloaders/src/file_client.rs b/crates/net/downloaders/src/file_client.rs index 5b21c82fb..df35146e9 100644 --- a/crates/net/downloaders/src/file_client.rs +++ b/crates/net/downloaders/src/file_client.rs @@ -265,6 +265,7 @@ impl FromReader for FileClient { } impl HeadersClient for FileClient { + type Header = Header; type Output = HeadersFut; fn get_headers_with_priority( @@ -315,6 +316,7 @@ impl HeadersClient for FileClient { } impl BodiesClient for FileClient { + type Body = BlockBody; type Output = BodiesFut; fn get_block_bodies_with_priority( diff --git a/crates/net/downloaders/src/headers/noop.rs b/crates/net/downloaders/src/headers/noop.rs index 210655f7e..58da73123 100644 --- a/crates/net/downloaders/src/headers/noop.rs +++ b/crates/net/downloaders/src/headers/noop.rs @@ -1,3 +1,4 @@ +use alloy_consensus::Header; use futures::Stream; use reth_network_p2p::headers::{ downloader::{HeaderDownloader, SyncTarget}, @@ -11,6 +12,8 @@ use reth_primitives::SealedHeader; pub struct NoopHeaderDownloader; impl HeaderDownloader for NoopHeaderDownloader { + type Header = Header; + fn update_local_head(&mut self, _: SealedHeader) {} fn update_sync_target(&mut self, _: SyncTarget) {} @@ -19,7 +22,7 @@ impl HeaderDownloader for NoopHeaderDownloader { } impl Stream for NoopHeaderDownloader { - type Item = Result, HeadersDownloaderError>; + type Item = Result, HeadersDownloaderError
>; fn poll_next( self: std::pin::Pin<&mut Self>, diff --git a/crates/net/downloaders/src/headers/reverse_headers.rs b/crates/net/downloaders/src/headers/reverse_headers.rs index f0c28dc5d..9532b4b3a 100644 --- a/crates/net/downloaders/src/headers/reverse_headers.rs +++ b/crates/net/downloaders/src/headers/reverse_headers.rs @@ -2,6 +2,7 @@ use super::task::TaskDownloader; use crate::metrics::HeaderDownloaderMetrics; +use alloy_consensus::BlockHeader; use alloy_eips::BlockHashOrNumber; use alloy_primitives::{BlockNumber, Sealable, B256}; use futures::{stream::Stream, FutureExt}; @@ -19,7 +20,7 @@ use reth_network_p2p::{ priority::Priority, }; use reth_network_peers::PeerId; -use reth_primitives::{GotExpected, Header, SealedHeader}; +use reth_primitives::{GotExpected, SealedHeader}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use std::{ cmp::{Ordering, Reverse}, @@ -39,14 +40,14 @@ const REQUESTS_PER_PEER_MULTIPLIER: usize = 5; /// Wrapper for internal downloader errors. #[derive(Error, Debug)] -enum ReverseHeadersDownloaderError { +enum ReverseHeadersDownloaderError { #[error(transparent)] - Downloader(#[from] HeadersDownloaderError), + Downloader(#[from] HeadersDownloaderError), #[error(transparent)] Response(#[from] Box), } -impl From for ReverseHeadersDownloaderError { +impl From for ReverseHeadersDownloaderError { fn from(value: HeadersResponseError) -> Self { Self::Response(Box::new(value)) } @@ -66,17 +67,17 @@ impl From for ReverseHeadersDownloaderError { #[derive(Debug)] pub struct ReverseHeadersDownloader { /// Consensus client used to validate headers - consensus: Arc, + consensus: Arc>, /// Client used to download headers. client: Arc, /// The local head of the chain. - local_head: Option, + local_head: Option>, /// Block we want to close the gap to. sync_target: Option, /// The block number to use for requests. next_request_block_number: u64, /// Keeps track of the block we need to validate next. - lowest_validated_header: Option, + lowest_validated_header: Option>, /// Tip block number to start validating from (in reverse) next_chain_tip_block_number: u64, /// The batch size per one request @@ -97,11 +98,11 @@ pub struct ReverseHeadersDownloader { /// requests in progress in_progress_queue: FuturesUnordered>, /// Buffered, unvalidated responses - buffered_responses: BinaryHeap, + buffered_responses: BinaryHeap>, /// Buffered, _sorted_ and validated headers ready to be returned. /// /// Note: headers are sorted from high to low - queued_validated_headers: Vec, + queued_validated_headers: Vec>, /// Header downloader metrics. metrics: HeaderDownloaderMetrics, } @@ -110,7 +111,7 @@ pub struct ReverseHeadersDownloader { impl ReverseHeadersDownloader where - H: HeadersClient + 'static, + H: HeadersClient + 'static, { /// Convenience method to create a [`ReverseHeadersDownloaderBuilder`] without importing it pub fn builder() -> ReverseHeadersDownloaderBuilder { @@ -120,7 +121,7 @@ where /// Returns the block number the local node is at. #[inline] fn local_block_number(&self) -> Option { - self.local_head.as_ref().map(|h| h.number) + self.local_head.as_ref().map(|h| h.number()) } /// Returns the existing local head block number @@ -130,7 +131,7 @@ where /// If the local head has not been set. #[inline] fn existing_local_block_number(&self) -> BlockNumber { - self.local_head.as_ref().expect("is initialized").number + self.local_head.as_ref().expect("is initialized").number() } /// Returns the existing sync target. @@ -197,14 +198,14 @@ where /// `lowest_validated_header`. /// /// This only returns `None` if we haven't fetched the initial chain tip yet. - fn lowest_validated_header(&self) -> Option<&SealedHeader> { + fn lowest_validated_header(&self) -> Option<&SealedHeader> { self.queued_validated_headers.last().or(self.lowest_validated_header.as_ref()) } /// Validate that the received header matches the expected sync target. fn validate_sync_target( &self, - header: &SealedHeader, + header: &SealedHeader, request: HeadersRequest, peer_id: PeerId, ) -> Result<(), Box> { @@ -220,12 +221,12 @@ where ), })) } - SyncTargetBlock::Number(number) if header.number != number => { + SyncTargetBlock::Number(number) if header.number() != number => { Err(Box::new(HeadersResponseError { request, peer_id: Some(peer_id), error: DownloadError::InvalidTipNumber(GotExpected { - got: header.number, + got: header.number(), expected: number, }), })) @@ -244,9 +245,9 @@ where fn process_next_headers( &mut self, request: HeadersRequest, - headers: Vec
, + headers: Vec, peer_id: PeerId, - ) -> Result<(), ReverseHeadersDownloaderError> { + ) -> Result<(), ReverseHeadersDownloaderError> { let mut validated = Vec::with_capacity(headers.len()); let sealed_headers = headers @@ -280,17 +281,17 @@ where if let Some((last_header, head)) = validated .last_mut() .zip(self.local_head.as_ref()) - .filter(|(last, head)| last.number == head.number + 1) + .filter(|(last, head)| last.number() == head.number() + 1) { // Every header must be valid on its own - if let Err(error) = self.consensus.validate_header(last_header) { + if let Err(error) = self.consensus.validate_header(&*last_header) { trace!(target: "downloaders::headers", %error, "Failed to validate header"); return Err(HeadersResponseError { request, peer_id: Some(peer_id), error: DownloadError::HeaderValidation { hash: head.hash(), - number: head.number, + number: head.number(), error: Box::new(error), }, } @@ -299,9 +300,9 @@ where // If the header is valid on its own, but not against its parent, we return it as // detached head error. - if let Err(error) = self.consensus.validate_header_against_parent(last_header, head) { + if let Err(error) = self.consensus.validate_header_against_parent(&*last_header, head) { // Replace the last header with a detached variant - error!(target: "downloaders::headers", %error, number = last_header.number, hash = ?last_header.hash(), "Header cannot be attached to known canonical chain"); + error!(target: "downloaders::headers", %error, number = last_header.number(), hash = ?last_header.hash(), "Header cannot be attached to known canonical chain"); return Err(HeadersDownloaderError::DetachedHead { local_head: Box::new(head.clone()), header: Box::new(last_header.clone()), @@ -313,7 +314,7 @@ where // update tracked block info (falling block number) self.next_chain_tip_block_number = - validated.last().expect("exists").number.saturating_sub(1); + validated.last().expect("exists").number().saturating_sub(1); self.queued_validated_headers.extend(validated); Ok(()) @@ -345,7 +346,7 @@ where let skip = self .queued_validated_headers .iter() - .take_while(|last| last.number > target_block_number) + .take_while(|last| last.number() > target_block_number) .count(); // removes all headers that are higher than current target self.queued_validated_headers.drain(..skip); @@ -360,8 +361,8 @@ where /// Handles the response for the request for the sync target fn on_sync_target_outcome( &mut self, - response: HeadersRequestOutcome, - ) -> Result<(), ReverseHeadersDownloaderError> { + response: HeadersRequestOutcome, + ) -> Result<(), ReverseHeadersDownloaderError> { let sync_target = self.existing_sync_target(); let HeadersRequestOutcome { request, outcome } = response; match outcome { @@ -372,7 +373,7 @@ where self.metrics.total_downloaded.increment(headers.len() as u64); // sort headers from highest to lowest block number - headers.sort_unstable_by_key(|h| Reverse(h.number)); + headers.sort_unstable_by_key(|h| Reverse(h.number())); if headers.is_empty() { return Err(HeadersResponseError { @@ -401,12 +402,12 @@ where } } SyncTargetBlock::Number(number) => { - if target.number != number { + if target.number() != number { return Err(HeadersResponseError { request, peer_id: Some(peer_id), error: DownloadError::InvalidTipNumber(GotExpected { - got: target.number, + got: target.number(), expected: number, }), } @@ -415,17 +416,17 @@ where } } - trace!(target: "downloaders::headers", head=?self.local_block_number(), hash=?target.hash(), number=%target.number, "Received sync target"); + trace!(target: "downloaders::headers", head=?self.local_block_number(), hash=?target.hash(), number=%target.number(), "Received sync target"); // This is the next block we need to start issuing requests from - let parent_block_number = target.number.saturating_sub(1); - self.on_block_number_update(target.number, parent_block_number); + let parent_block_number = target.number().saturating_sub(1); + self.on_block_number_update(target.number(), parent_block_number); self.queued_validated_headers.push(target); // try to validate all buffered responses blocked by this successful response self.try_validate_buffered() - .map(Err::<(), ReverseHeadersDownloaderError>) + .map(Err::<(), ReverseHeadersDownloaderError>) .transpose()?; Ok(()) @@ -439,8 +440,8 @@ where /// Invoked when we received a response fn on_headers_outcome( &mut self, - response: HeadersRequestOutcome, - ) -> Result<(), ReverseHeadersDownloaderError> { + response: HeadersRequestOutcome, + ) -> Result<(), ReverseHeadersDownloaderError> { let requested_block_number = response.block_number(); let HeadersRequestOutcome { request, outcome } = response; @@ -475,19 +476,19 @@ where } // sort headers from highest to lowest block number - headers.sort_unstable_by_key(|h| Reverse(h.number)); + headers.sort_unstable_by_key(|h| Reverse(h.number())); // validate the response let highest = &headers[0]; - trace!(target: "downloaders::headers", requested_block_number, highest=?highest.number, "Validating non-empty headers response"); + trace!(target: "downloaders::headers", requested_block_number, highest=?highest.number(), "Validating non-empty headers response"); - if highest.number != requested_block_number { + if highest.number() != requested_block_number { return Err(HeadersResponseError { request, peer_id: Some(peer_id), error: DownloadError::HeadersResponseStartBlockMismatch(GotExpected { - got: highest.number, + got: highest.number(), expected: requested_block_number, }), } @@ -495,14 +496,14 @@ where } // check if the response is the next expected - if highest.number == self.next_chain_tip_block_number { + if highest.number() == self.next_chain_tip_block_number { // is next response, validate it self.process_next_headers(request, headers, peer_id)?; // try to validate all buffered responses blocked by this successful response self.try_validate_buffered() - .map(Err::<(), ReverseHeadersDownloaderError>) + .map(Err::<(), ReverseHeadersDownloaderError>) .transpose()?; - } else if highest.number > self.existing_local_block_number() { + } else if highest.number() > self.existing_local_block_number() { self.metrics.buffered_responses.increment(1.); // can't validate yet self.buffered_responses.push(OrderedHeadersResponse { @@ -549,7 +550,7 @@ where /// Attempts to validate the buffered responses /// /// Returns an error if the next expected response was popped, but failed validation. - fn try_validate_buffered(&mut self) -> Option { + fn try_validate_buffered(&mut self) -> Option> { loop { // Check to see if we've already received the next value let next_response = self.buffered_responses.peek_mut()?; @@ -598,7 +599,11 @@ where } /// Validate whether the header is valid in relation to it's parent - fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> { + fn validate( + &self, + header: &SealedHeader, + parent: &SealedHeader, + ) -> DownloadResult<()> { validate_header_download(&self.consensus, header, parent) } @@ -614,7 +619,7 @@ where } /// Splits off the next batch of headers - fn split_next_batch(&mut self) -> Vec { + fn split_next_batch(&mut self) -> Vec> { let batch_size = self.stream_batch_size.min(self.queued_validated_headers.len()); let mut rem = self.queued_validated_headers.split_off(batch_size); std::mem::swap(&mut rem, &mut self.queued_validated_headers); @@ -644,12 +649,15 @@ where Self: HeaderDownloader + 'static, { /// Spawns the downloader task via [`tokio::task::spawn`] - pub fn into_task(self) -> TaskDownloader { + pub fn into_task(self) -> TaskDownloader<::Header> { self.into_task_with(&TokioTaskExecutor::default()) } /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given `spawner`. - pub fn into_task_with(self, spawner: &S) -> TaskDownloader + pub fn into_task_with( + self, + spawner: &S, + ) -> TaskDownloader<::Header> where S: TaskSpawner, { @@ -659,11 +667,17 @@ where impl HeaderDownloader for ReverseHeadersDownloader where - H: HeadersClient + 'static, + H: HeadersClient + 'static, { - fn update_local_head(&mut self, head: SealedHeader) { + type Header = H::Header; + + fn update_local_head(&mut self, head: SealedHeader) { // ensure we're only yielding headers that are in range and follow the current local head. - while self.queued_validated_headers.last().is_some_and(|last| last.number <= head.number) { + while self + .queued_validated_headers + .last() + .is_some_and(|last| last.number() <= head.number()) + { // headers are sorted high to low self.queued_validated_headers.pop(); } @@ -686,7 +700,7 @@ where .queued_validated_headers .first() .filter(|h| h.hash() == tip) - .map(|h| h.number) + .map(|h| h.number()) { self.sync_target = Some(new_sync_target.with_number(target_number)); return @@ -740,9 +754,9 @@ where impl Stream for ReverseHeadersDownloader where - H: HeadersClient + 'static, + H: HeadersClient + 'static, { - type Item = HeadersDownloaderResult>; + type Item = HeadersDownloaderResult>, H::Header>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -883,18 +897,18 @@ where } } -/// A future that returns a list of [`Header`] on success. +/// A future that returns a list of headers on success. #[derive(Debug)] struct HeadersRequestFuture { request: Option, fut: F, } -impl Future for HeadersRequestFuture +impl Future for HeadersRequestFuture where - F: Future>> + Sync + Send + Unpin, + F: Future>> + Sync + Send + Unpin, { - type Output = HeadersRequestOutcome; + type Output = HeadersRequestOutcome; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); @@ -906,14 +920,14 @@ where } /// The outcome of the [`HeadersRequestFuture`] -struct HeadersRequestOutcome { +struct HeadersRequestOutcome { request: HeadersRequest, - outcome: PeerRequestResult>, + outcome: PeerRequestResult>, } // === impl OrderedHeadersResponse === -impl HeadersRequestOutcome { +impl HeadersRequestOutcome { fn block_number(&self) -> u64 { self.request.start.as_number().expect("is number") } @@ -921,35 +935,35 @@ impl HeadersRequestOutcome { /// Wrapper type to order responses #[derive(Debug)] -struct OrderedHeadersResponse { - headers: Vec
, +struct OrderedHeadersResponse { + headers: Vec, request: HeadersRequest, peer_id: PeerId, } // === impl OrderedHeadersResponse === -impl OrderedHeadersResponse { +impl OrderedHeadersResponse { fn block_number(&self) -> u64 { self.request.start.as_number().expect("is number") } } -impl PartialEq for OrderedHeadersResponse { +impl PartialEq for OrderedHeadersResponse { fn eq(&self, other: &Self) -> bool { self.block_number() == other.block_number() } } -impl Eq for OrderedHeadersResponse {} +impl Eq for OrderedHeadersResponse {} -impl PartialOrd for OrderedHeadersResponse { +impl PartialOrd for OrderedHeadersResponse { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Ord for OrderedHeadersResponse { +impl Ord for OrderedHeadersResponse { fn cmp(&self, other: &Self) -> Ordering { self.block_number().cmp(&other.block_number()) } @@ -1156,7 +1170,11 @@ impl ReverseHeadersDownloaderBuilder { /// Build [`ReverseHeadersDownloader`] with provided consensus /// and header client implementations - pub fn build(self, client: H, consensus: Arc) -> ReverseHeadersDownloader + pub fn build( + self, + client: H, + consensus: Arc>, + ) -> ReverseHeadersDownloader where H: HeadersClient + 'static, { @@ -1214,6 +1232,7 @@ fn calc_next_request( mod tests { use super::*; use crate::headers::test_utils::child_header; + use alloy_consensus::Header; use assert_matches::assert_matches; use reth_consensus::test_utils::TestConsensus; use reth_network_p2p::test_utils::TestHeadersClient; @@ -1296,7 +1315,7 @@ mod tests { assert!(downloader.sync_target_request.is_some()); downloader.sync_target_request.take(); - let target = SyncTarget::Gap(SealedHeader::new(Header::default(), B256::random())); + let target = SyncTarget::Gap(SealedHeader::new(Default::default(), B256::random())); downloader.update_sync_target(target); assert!(downloader.sync_target_request.is_none()); assert_matches!( @@ -1373,7 +1392,7 @@ mod tests { fn test_resp_order() { let mut heap = BinaryHeap::new(); let hi = 1u64; - heap.push(OrderedHeadersResponse { + heap.push(OrderedHeadersResponse::
{ headers: vec![], request: HeadersRequest { start: hi.into(), limit: 0, direction: Default::default() }, peer_id: Default::default(), diff --git a/crates/net/downloaders/src/headers/task.rs b/crates/net/downloaders/src/headers/task.rs index b3fa27fde..81c4cd80d 100644 --- a/crates/net/downloaders/src/headers/task.rs +++ b/crates/net/downloaders/src/headers/task.rs @@ -22,15 +22,15 @@ pub const HEADERS_TASK_BUFFER_SIZE: usize = 8; /// A [HeaderDownloader] that drives a spawned [HeaderDownloader] on a spawned task. #[derive(Debug)] #[pin_project] -pub struct TaskDownloader { +pub struct TaskDownloader { #[pin] - from_downloader: ReceiverStream>>, - to_downloader: UnboundedSender, + from_downloader: ReceiverStream>, H>>, + to_downloader: UnboundedSender>, } // === impl TaskDownloader === -impl TaskDownloader { +impl TaskDownloader { /// Spawns the given `downloader` via [`tokio::task::spawn`] and returns a [`TaskDownloader`] /// that's connected to that task. /// @@ -46,7 +46,8 @@ impl TaskDownloader { /// # use reth_downloaders::headers::task::TaskDownloader; /// # use reth_consensus::Consensus; /// # use reth_network_p2p::headers::client::HeadersClient; - /// # fn t(consensus:Arc, client: Arc) { + /// # use reth_primitives_traits::BlockHeader; + /// # fn t + 'static>(consensus:Arc>, client: Arc) { /// let downloader = ReverseHeadersDownloader::::builder().build( /// client, /// consensus @@ -55,7 +56,7 @@ impl TaskDownloader { /// # } pub fn spawn(downloader: T) -> Self where - T: HeaderDownloader + 'static, + T: HeaderDownloader
+ 'static, { Self::spawn_with(downloader, &TokioTaskExecutor::default()) } @@ -64,7 +65,7 @@ impl TaskDownloader { /// that's connected to that task. pub fn spawn_with(downloader: T, spawner: &S) -> Self where - T: HeaderDownloader + 'static, + T: HeaderDownloader
+ 'static, S: TaskSpawner, { let (headers_tx, headers_rx) = mpsc::channel(HEADERS_TASK_BUFFER_SIZE); @@ -81,12 +82,14 @@ impl TaskDownloader { } } -impl HeaderDownloader for TaskDownloader { - fn update_sync_gap(&mut self, head: SealedHeader, target: SyncTarget) { +impl HeaderDownloader for TaskDownloader { + type Header = H; + + fn update_sync_gap(&mut self, head: SealedHeader, target: SyncTarget) { let _ = self.to_downloader.send(DownloaderUpdates::UpdateSyncGap(head, target)); } - fn update_local_head(&mut self, head: SealedHeader) { + fn update_local_head(&mut self, head: SealedHeader) { let _ = self.to_downloader.send(DownloaderUpdates::UpdateLocalHead(head)); } @@ -99,8 +102,8 @@ impl HeaderDownloader for TaskDownloader { } } -impl Stream for TaskDownloader { - type Item = HeadersDownloaderResult>; +impl Stream for TaskDownloader { + type Item = HeadersDownloaderResult>, H>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().from_downloader.poll_next(cx) @@ -108,9 +111,10 @@ impl Stream for TaskDownloader { } /// A [`HeaderDownloader`] that runs on its own task -struct SpawnedDownloader { - updates: UnboundedReceiverStream, - headers_tx: PollSender>>, +#[expect(clippy::complexity)] +struct SpawnedDownloader { + updates: UnboundedReceiverStream>, + headers_tx: PollSender>, T::Header>>, downloader: T, } @@ -170,9 +174,9 @@ impl Future for SpawnedDownloader { } /// Commands delegated tot the spawned [`HeaderDownloader`] -enum DownloaderUpdates { - UpdateSyncGap(SealedHeader, SyncTarget), - UpdateLocalHead(SealedHeader), +enum DownloaderUpdates { + UpdateSyncGap(SealedHeader, SyncTarget), + UpdateLocalHead(SealedHeader), UpdateSyncTarget(SyncTarget), SetBatchSize(usize), } diff --git a/crates/net/downloaders/src/test_utils/bodies_client.rs b/crates/net/downloaders/src/test_utils/bodies_client.rs index be8373f82..d84d92363 100644 --- a/crates/net/downloaders/src/test_utils/bodies_client.rs +++ b/crates/net/downloaders/src/test_utils/bodies_client.rs @@ -78,6 +78,7 @@ impl DownloadClient for TestBodiesClient { } impl BodiesClient for TestBodiesClient { + type Body = BlockBody; type Output = BodiesFut; fn get_block_bodies_with_priority( diff --git a/crates/net/eth-wire-types/Cargo.toml b/crates/net/eth-wire-types/Cargo.toml index 1d2b54872..582ab7557 100644 --- a/crates/net/eth-wire-types/Cargo.toml +++ b/crates/net/eth-wire-types/Cargo.toml @@ -18,6 +18,7 @@ reth-codecs-derive.workspace = true reth-primitives.workspace = true # ethereum +alloy-consensus.workspace = true alloy-chains = { workspace = true, features = ["rlp"] } alloy-eips.workspace = true alloy-primitives.workspace = true diff --git a/crates/net/eth-wire-types/src/message.rs b/crates/net/eth-wire-types/src/message.rs index cca6600d1..2a6d973ff 100644 --- a/crates/net/eth-wire-types/src/message.rs +++ b/crates/net/eth-wire-types/src/message.rs @@ -148,7 +148,7 @@ impl From> for ProtocolMessage { /// Represents messages that can be sent to multiple peers. #[derive(Clone, Debug)] -pub struct ProtocolBroadcastMessage { +pub struct ProtocolBroadcastMessage { /// The unique identifier representing the type of the Ethereum message. pub message_type: EthMessageID, /// The content of the message to be broadcasted, including specific data based on the message diff --git a/crates/net/eth-wire-types/src/primitives.rs b/crates/net/eth-wire-types/src/primitives.rs index ca85fa69a..04b8b429e 100644 --- a/crates/net/eth-wire-types/src/primitives.rs +++ b/crates/net/eth-wire-types/src/primitives.rs @@ -2,6 +2,7 @@ use std::fmt::Debug; +use alloy_consensus::BlockHeader; use alloy_rlp::{Decodable, Encodable}; /// Abstraction over primitive types which might appear in network messages. See @@ -10,7 +11,8 @@ pub trait NetworkPrimitives: Send + Sync + Unpin + Clone + Debug + PartialEq + Eq + 'static { /// The block header type. - type BlockHeader: Encodable + type BlockHeader: BlockHeader + + Encodable + Decodable + Send + Sync diff --git a/crates/net/eth-wire/src/capability.rs b/crates/net/eth-wire/src/capability.rs index d60e50074..625971e0e 100644 --- a/crates/net/eth-wire/src/capability.rs +++ b/crates/net/eth-wire/src/capability.rs @@ -5,10 +5,11 @@ use crate::{ p2pstream::MAX_RESERVED_MESSAGE_ID, protocol::{ProtoVersion, Protocol}, version::ParseVersionError, - Capability, EthMessage, EthMessageID, EthVersion, + Capability, EthMessageID, EthVersion, }; use alloy_primitives::bytes::Bytes; use derive_more::{Deref, DerefMut}; +use reth_eth_wire_types::{EthMessage, EthNetworkPrimitives, NetworkPrimitives}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use std::{ @@ -30,9 +31,13 @@ pub struct RawCapabilityMessage { /// network. #[derive(Debug)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum CapabilityMessage { +pub enum CapabilityMessage { /// Eth sub-protocol message. - Eth(EthMessage), + #[cfg_attr( + feature = "serde", + serde(bound = "EthMessage: Serialize + serde::de::DeserializeOwned") + )] + Eth(EthMessage), /// Any other capability message. Other(RawCapabilityMessage), } diff --git a/crates/net/eth-wire/src/ethstream.rs b/crates/net/eth-wire/src/ethstream.rs index 795dd6307..c971f6182 100644 --- a/crates/net/eth-wire/src/ethstream.rs +++ b/crates/net/eth-wire/src/ethstream.rs @@ -8,6 +8,7 @@ use crate::{ use alloy_primitives::bytes::{Bytes, BytesMut}; use futures::{ready, Sink, SinkExt, StreamExt}; use pin_project::pin_project; +use reth_eth_wire_types::NetworkPrimitives; use reth_primitives::{ForkFilter, GotExpected}; use std::{ pin::Pin, @@ -54,32 +55,32 @@ where /// Consumes the [`UnauthedEthStream`] and returns an [`EthStream`] after the `Status` /// handshake is completed successfully. This also returns the `Status` message sent by the /// remote peer. - pub async fn handshake( + pub async fn handshake( self, status: Status, fork_filter: ForkFilter, - ) -> Result<(EthStream, Status), EthStreamError> { + ) -> Result<(EthStream, Status), EthStreamError> { self.handshake_with_timeout(status, fork_filter, HANDSHAKE_TIMEOUT).await } /// Wrapper around handshake which enforces a timeout. - pub async fn handshake_with_timeout( + pub async fn handshake_with_timeout( self, status: Status, fork_filter: ForkFilter, timeout_limit: Duration, - ) -> Result<(EthStream, Status), EthStreamError> { + ) -> Result<(EthStream, Status), EthStreamError> { timeout(timeout_limit, Self::handshake_without_timeout(self, status, fork_filter)) .await .map_err(|_| EthStreamError::StreamTimeout)? } /// Handshake with no timeout - pub async fn handshake_without_timeout( + pub async fn handshake_without_timeout( mut self, status: Status, fork_filter: ForkFilter, - ) -> Result<(EthStream, Status), EthStreamError> { + ) -> Result<(EthStream, Status), EthStreamError> { trace!( %status, "sending eth status to peer" @@ -89,10 +90,8 @@ where // The max length for a status with TTD is: + self.inner .send( - alloy_rlp::encode(ProtocolMessage::from( - EthMessage::::Status(status), - )) - .into(), + alloy_rlp::encode(ProtocolMessage::::from(EthMessage::::Status(status))) + .into(), ) .await?; @@ -112,15 +111,14 @@ where } let version = status.version; - let msg: ProtocolMessage = - match ProtocolMessage::decode_message(version, &mut their_msg.as_ref()) { - Ok(m) => m, - Err(err) => { - debug!("decode error in eth handshake: msg={their_msg:x}"); - self.inner.disconnect(DisconnectReason::DisconnectRequested).await?; - return Err(EthStreamError::InvalidMessage(err)) - } - }; + let msg = match ProtocolMessage::::decode_message(version, &mut their_msg.as_ref()) { + Ok(m) => m, + Err(err) => { + debug!("decode error in eth handshake: msg={their_msg:x}"); + self.inner.disconnect(DisconnectReason::DisconnectRequested).await?; + return Err(EthStreamError::InvalidMessage(err)) + } + }; // The following checks should match the checks in go-ethereum: // https://github.com/ethereum/go-ethereum/blob/9244d5cd61f3ea5a7645fdf2a1a96d53421e412f/eth/protocols/eth/handshake.go#L87-L89 @@ -194,19 +192,21 @@ where /// compatible with eth-networking protocol messages, which get RLP encoded/decoded. #[pin_project] #[derive(Debug)] -pub struct EthStream { +pub struct EthStream { /// Negotiated eth version. version: EthVersion, #[pin] inner: S, + + _pd: std::marker::PhantomData, } -impl EthStream { +impl EthStream { /// Creates a new unauthed [`EthStream`] from a provided stream. You will need /// to manually handshake a peer. #[inline] pub const fn new(version: EthVersion, inner: S) -> Self { - Self { version, inner } + Self { version, inner, _pd: std::marker::PhantomData } } /// Returns the eth version. @@ -234,15 +234,16 @@ impl EthStream { } } -impl EthStream +impl EthStream where S: Sink + Unpin, EthStreamError: From, + N: NetworkPrimitives, { /// Same as [`Sink::start_send`] but accepts a [`EthBroadcastMessage`] instead. pub fn start_send_broadcast( &mut self, - item: EthBroadcastMessage, + item: EthBroadcastMessage, ) -> Result<(), EthStreamError> { self.inner.start_send_unpin(Bytes::from(alloy_rlp::encode( ProtocolBroadcastMessage::from(item), @@ -252,12 +253,13 @@ where } } -impl Stream for EthStream +impl Stream for EthStream where S: Stream> + Unpin, EthStreamError: From, + N: NetworkPrimitives, { - type Item = Result; + type Item = Result, EthStreamError>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); @@ -299,10 +301,11 @@ where } } -impl Sink for EthStream +impl Sink> for EthStream where S: CanDisconnect + Unpin, EthStreamError: From<>::Error>, + N: NetworkPrimitives, { type Error = EthStreamError; @@ -310,7 +313,7 @@ where self.project().inner.poll_ready(cx).map_err(Into::into) } - fn start_send(self: Pin<&mut Self>, item: EthMessage) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: EthMessage) -> Result<(), Self::Error> { if matches!(item, EthMessage::Status(_)) { // TODO: to disconnect here we would need to do something similar to P2PStream's // start_disconnect, which would ideally be a part of the CanDisconnect trait, or at @@ -340,10 +343,11 @@ where } } -impl CanDisconnect for EthStream +impl CanDisconnect> for EthStream where S: CanDisconnect + Send, EthStreamError: From<>::Error>, + N: NetworkPrimitives, { async fn disconnect(&mut self, reason: DisconnectReason) -> Result<(), EthStreamError> { self.inner.disconnect(reason).await.map_err(Into::into) @@ -365,6 +369,7 @@ mod tests { use futures::{SinkExt, StreamExt}; use reth_chainspec::NamedChain; use reth_ecies::stream::ECIESStream; + use reth_eth_wire_types::EthNetworkPrimitives; use reth_network_peers::pk2id; use reth_primitives::{ForkFilter, Head}; use secp256k1::{SecretKey, SECP256K1}; @@ -397,7 +402,7 @@ mod tests { let (incoming, _) = listener.accept().await.unwrap(); let stream = PassthroughCodec::default().framed(incoming); let (_, their_status) = UnauthedEthStream::new(stream) - .handshake(status_clone, fork_filter_clone) + .handshake::(status_clone, fork_filter_clone) .await .unwrap(); @@ -409,8 +414,10 @@ mod tests { let sink = PassthroughCodec::default().framed(outgoing); // try to connect - let (_, their_status) = - UnauthedEthStream::new(sink).handshake(status, fork_filter).await.unwrap(); + let (_, their_status) = UnauthedEthStream::new(sink) + .handshake::(status, fork_filter) + .await + .unwrap(); // their status is a clone of our status, these should be equal assert_eq!(their_status, status); @@ -444,7 +451,7 @@ mod tests { let (incoming, _) = listener.accept().await.unwrap(); let stream = PassthroughCodec::default().framed(incoming); let (_, their_status) = UnauthedEthStream::new(stream) - .handshake(status_clone, fork_filter_clone) + .handshake::(status_clone, fork_filter_clone) .await .unwrap(); @@ -456,8 +463,10 @@ mod tests { let sink = PassthroughCodec::default().framed(outgoing); // try to connect - let (_, their_status) = - UnauthedEthStream::new(sink).handshake(status, fork_filter).await.unwrap(); + let (_, their_status) = UnauthedEthStream::new(sink) + .handshake::(status, fork_filter) + .await + .unwrap(); // their status is a clone of our status, these should be equal assert_eq!(their_status, status); @@ -490,8 +499,9 @@ mod tests { // roughly based off of the design of tokio::net::TcpListener let (incoming, _) = listener.accept().await.unwrap(); let stream = PassthroughCodec::default().framed(incoming); - let handshake_res = - UnauthedEthStream::new(stream).handshake(status_clone, fork_filter_clone).await; + let handshake_res = UnauthedEthStream::new(stream) + .handshake::(status_clone, fork_filter_clone) + .await; // make sure the handshake fails due to td too high assert!(matches!( @@ -506,7 +516,9 @@ mod tests { let sink = PassthroughCodec::default().framed(outgoing); // try to connect - let handshake_res = UnauthedEthStream::new(sink).handshake(status, fork_filter).await; + let handshake_res = UnauthedEthStream::new(sink) + .handshake::(status, fork_filter) + .await; // this handshake should also fail due to td too high assert!(matches!( @@ -524,7 +536,7 @@ mod tests { async fn can_write_and_read_cleartext() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let local_addr = listener.local_addr().unwrap(); - let test_msg = EthMessage::NewBlockHashes( + let test_msg: EthMessage = EthMessage::NewBlockHashes( vec![ BlockHashNumber { hash: B256::random(), number: 5 }, BlockHashNumber { hash: B256::random(), number: 6 }, @@ -559,7 +571,7 @@ mod tests { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let local_addr = listener.local_addr().unwrap(); let server_key = SecretKey::new(&mut rand::thread_rng()); - let test_msg = EthMessage::NewBlockHashes( + let test_msg: EthMessage = EthMessage::NewBlockHashes( vec![ BlockHashNumber { hash: B256::random(), number: 5 }, BlockHashNumber { hash: B256::random(), number: 6 }, @@ -601,7 +613,7 @@ mod tests { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let local_addr = listener.local_addr().unwrap(); let server_key = SecretKey::new(&mut rand::thread_rng()); - let test_msg = EthMessage::NewBlockHashes( + let test_msg: EthMessage = EthMessage::NewBlockHashes( vec![ BlockHashNumber { hash: B256::random(), number: 5 }, BlockHashNumber { hash: B256::random(), number: 6 }, @@ -705,7 +717,7 @@ mod tests { let (incoming, _) = listener.accept().await.unwrap(); let stream = PassthroughCodec::default().framed(incoming); let (_, their_status) = UnauthedEthStream::new(stream) - .handshake(status_clone, fork_filter_clone) + .handshake::(status_clone, fork_filter_clone) .await .unwrap(); @@ -718,7 +730,11 @@ mod tests { // try to connect let handshake_result = UnauthedEthStream::new(sink) - .handshake_with_timeout(status, fork_filter, Duration::from_secs(1)) + .handshake_with_timeout::( + status, + fork_filter, + Duration::from_secs(1), + ) .await; // Assert that a timeout error occurred diff --git a/crates/net/eth-wire/src/multiplex.rs b/crates/net/eth-wire/src/multiplex.rs index d1d977aba..6f882f408 100644 --- a/crates/net/eth-wire/src/multiplex.rs +++ b/crates/net/eth-wire/src/multiplex.rs @@ -24,6 +24,7 @@ use crate::{ }; use bytes::{Bytes, BytesMut}; use futures::{Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt}; +use reth_eth_wire_types::NetworkPrimitives; use reth_primitives::ForkFilter; use tokio::sync::{mpsc, mpsc::UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -204,11 +205,11 @@ impl RlpxProtocolMultiplexer { /// Converts this multiplexer into a [`RlpxSatelliteStream`] with eth protocol as the given /// primary protocol. - pub async fn into_eth_satellite_stream( + pub async fn into_eth_satellite_stream( self, status: Status, fork_filter: ForkFilter, - ) -> Result<(RlpxSatelliteStream>, Status), EthStreamError> + ) -> Result<(RlpxSatelliteStream>, Status), EthStreamError> where St: Stream> + Sink + Unpin, { @@ -674,6 +675,7 @@ mod tests { }, UnauthedP2PStream, }; + use reth_eth_wire_types::EthNetworkPrimitives; use tokio::{net::TcpListener, sync::oneshot}; use tokio_util::codec::Decoder; @@ -693,7 +695,7 @@ mod tests { UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap(); let (_eth_stream, _) = UnauthedEthStream::new(p2p_stream) - .handshake(other_status, other_fork_filter) + .handshake::(other_status, other_fork_filter) .await .unwrap(); @@ -708,7 +710,9 @@ mod tests { .into_satellite_stream_with_handshake( eth.capability().as_ref(), move |proxy| async move { - UnauthedEthStream::new(proxy).handshake(status, fork_filter).await + UnauthedEthStream::new(proxy) + .handshake::(status, fork_filter) + .await }, ) .await @@ -731,7 +735,7 @@ mod tests { let (conn, _) = UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap(); let (mut st, _their_status) = RlpxProtocolMultiplexer::new(conn) - .into_eth_satellite_stream(other_status, other_fork_filter) + .into_eth_satellite_stream::(other_status, other_fork_filter) .await .unwrap(); @@ -762,7 +766,7 @@ mod tests { let conn = connect_passthrough(local_addr, test_hello().0).await; let (mut st, _their_status) = RlpxProtocolMultiplexer::new(conn) - .into_eth_satellite_stream(status, fork_filter) + .into_eth_satellite_stream::(status, fork_filter) .await .unwrap(); diff --git a/crates/net/network-api/src/downloaders.rs b/crates/net/network-api/src/downloaders.rs index f081c16ed..cbfe81613 100644 --- a/crates/net/network-api/src/downloaders.rs +++ b/crates/net/network-api/src/downloaders.rs @@ -1,5 +1,7 @@ //! API related to syncing blocks. +use std::fmt::Debug; + use futures::Future; use reth_network_p2p::BlockClient; use tokio::sync::oneshot; @@ -7,10 +9,13 @@ use tokio::sync::oneshot; /// Provides client for downloading blocks. #[auto_impl::auto_impl(&, Arc)] pub trait BlockDownloaderProvider { + /// The client this type can provide. + type Client: BlockClient + Send + Sync + Clone + 'static; + /// Returns a new [`BlockClient`], used for fetching blocks from peers. /// /// The client is the entrypoint for sending block requests to the network. fn fetch_client( &self, - ) -> impl Future> + Send; + ) -> impl Future> + Send; } diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs index d2bd66d1f..af392b6f9 100644 --- a/crates/net/network-api/src/events.rs +++ b/crates/net/network-api/src/events.rs @@ -4,8 +4,9 @@ use std::{fmt, net::SocketAddr, sync::Arc}; use reth_eth_wire_types::{ message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage, - EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, - NodeData, PooledTransactions, Receipts, Status, + EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData, + GetPooledTransactions, GetReceipts, NetworkPrimitives, NodeData, PooledTransactions, Receipts, + Status, }; use reth_ethereum_forks::ForkId; use reth_network_p2p::error::{RequestError, RequestResult}; @@ -30,8 +31,8 @@ pub trait NetworkEventListenerProvider: Send + Sync { /// /// This includes any event types that may be relevant to tasks, for metrics, keep track of peers /// etc. -#[derive(Debug, Clone)] -pub enum NetworkEvent { +#[derive(Debug)] +pub enum NetworkEvent { /// Closed the peer session. SessionClosed { /// The identifier of the peer to which a session was closed. @@ -50,7 +51,7 @@ pub enum NetworkEvent { /// Capabilities the peer announced capabilities: Arc, /// A request channel to the session task. - messages: PeerRequestSender, + messages: PeerRequestSender, /// The status of the peer to which a session was established. status: Arc, /// negotiated eth version of the session @@ -62,6 +63,35 @@ pub enum NetworkEvent { PeerRemoved(PeerId), } +impl Clone for NetworkEvent { + fn clone(&self) -> Self { + match self { + Self::SessionClosed { peer_id, reason } => { + Self::SessionClosed { peer_id: *peer_id, reason: *reason } + } + Self::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + messages, + status, + version, + } => Self::SessionEstablished { + peer_id: *peer_id, + remote_addr: *remote_addr, + client_version: client_version.clone(), + capabilities: capabilities.clone(), + messages: messages.clone(), + status: status.clone(), + version: *version, + }, + Self::PeerAdded(peer) => Self::PeerAdded(*peer), + Self::PeerRemoved(peer) => Self::PeerRemoved(*peer), + } + } +} + /// Events produced by the `Discovery` manager. #[derive(Debug, Clone, PartialEq, Eq)] pub enum DiscoveryEvent { @@ -98,7 +128,7 @@ pub enum DiscoveredEvent { /// Protocol related request messages that expect a response #[derive(Debug)] -pub enum PeerRequest { +pub enum PeerRequest { /// Requests block headers from the peer. /// /// The response should be sent through the channel. @@ -106,7 +136,7 @@ pub enum PeerRequest { /// The request for block headers. request: GetBlockHeaders, /// The channel to send the response for block headers. - response: oneshot::Sender>, + response: oneshot::Sender>>, }, /// Requests block bodies from the peer. /// @@ -115,7 +145,7 @@ pub enum PeerRequest { /// The request for block bodies. request: GetBlockBodies, /// The channel to send the response for block bodies. - response: oneshot::Sender>, + response: oneshot::Sender>>, }, /// Requests pooled transactions from the peer. /// @@ -148,7 +178,7 @@ pub enum PeerRequest { // === impl PeerRequest === -impl PeerRequest { +impl PeerRequest { /// Invoked if we received a response which does not match the request pub fn send_bad_response(self) { self.send_err_response(RequestError::BadResponse) @@ -166,7 +196,7 @@ impl PeerRequest { } /// Returns the [`EthMessage`] for this type - pub fn create_request_message(&self, request_id: u64) -> EthMessage { + pub fn create_request_message(&self, request_id: u64) -> EthMessage { match self { Self::GetBlockHeaders { request, .. } => { EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request }) @@ -199,24 +229,29 @@ impl PeerRequest { } /// A Cloneable connection for sending _requests_ directly to the session of a peer. -#[derive(Clone)] -pub struct PeerRequestSender { +pub struct PeerRequestSender { /// id of the remote node. pub peer_id: PeerId, /// The Sender half connected to a session. - pub to_session_tx: mpsc::Sender, + pub to_session_tx: mpsc::Sender, +} + +impl Clone for PeerRequestSender { + fn clone(&self) -> Self { + Self { peer_id: self.peer_id, to_session_tx: self.to_session_tx.clone() } + } } // === impl PeerRequestSender === -impl PeerRequestSender { +impl PeerRequestSender { /// Constructs a new sender instance that's wired to a session - pub const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender) -> Self { + pub const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender) -> Self { Self { peer_id, to_session_tx } } /// Attempts to immediately send a message on this Sender - pub fn try_send(&self, req: PeerRequest) -> Result<(), mpsc::error::TrySendError> { + pub fn try_send(&self, req: R) -> Result<(), mpsc::error::TrySendError> { self.to_session_tx.try_send(req) } @@ -226,7 +261,7 @@ impl PeerRequestSender { } } -impl fmt::Debug for PeerRequestSender { +impl fmt::Debug for PeerRequestSender { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive() } diff --git a/crates/net/network-api/src/lib.rs b/crates/net/network-api/src/lib.rs index 6163c8730..986d490c3 100644 --- a/crates/net/network-api/src/lib.rs +++ b/crates/net/network-api/src/lib.rs @@ -36,6 +36,7 @@ pub use events::{ use std::{future::Future, net::SocketAddr, sync::Arc, time::Instant}; use reth_eth_wire_types::{capability::Capabilities, DisconnectReason, EthVersion, Status}; +use reth_network_p2p::EthBlockClient; use reth_network_peers::NodeRecord; /// The `PeerId` type. @@ -43,7 +44,7 @@ pub type PeerId = alloy_primitives::B512; /// Helper trait that unifies network API needed to launch node. pub trait FullNetwork: - BlockDownloaderProvider + BlockDownloaderProvider + NetworkSyncUpdater + NetworkInfo + NetworkEventListenerProvider @@ -55,7 +56,7 @@ pub trait FullNetwork: } impl FullNetwork for T where - T: BlockDownloaderProvider + T: BlockDownloaderProvider + NetworkSyncUpdater + NetworkInfo + NetworkEventListenerProvider diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index f444aa7fe..148eef34b 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -34,6 +34,7 @@ reth-network-peers = { workspace = true, features = ["net"] } reth-network-types.workspace = true # ethereum +alloy-consensus.workspace = true alloy-eips.workspace = true alloy-primitives.workspace = true alloy-rlp.workspace = true diff --git a/crates/net/network/src/fetch/client.rs b/crates/net/network/src/fetch/client.rs index c47ee5d23..584c079b8 100644 --- a/crates/net/network/src/fetch/client.rs +++ b/crates/net/network/src/fetch/client.rs @@ -7,6 +7,7 @@ use std::sync::{ use alloy_primitives::B256; use futures::{future, future::Either}; +use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives}; use reth_network_api::test_utils::PeersHandle; use reth_network_p2p::{ bodies::client::{BodiesClient, BodiesFut}, @@ -17,7 +18,6 @@ use reth_network_p2p::{ }; use reth_network_peers::PeerId; use reth_network_types::ReputationChangeKind; -use reth_primitives::Header; use tokio::sync::{mpsc::UnboundedSender, oneshot}; use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse}; @@ -30,16 +30,16 @@ use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse}; /// /// include_mmd!("docs/mermaid/fetch-client.mmd") #[derive(Debug, Clone)] -pub struct FetchClient { +pub struct FetchClient { /// Sender half of the request channel. - pub(crate) request_tx: UnboundedSender, + pub(crate) request_tx: UnboundedSender>, /// The handle to the peers pub(crate) peers_handle: PeersHandle, /// Number of active peer sessions the node's currently handling. pub(crate) num_active_peers: Arc, } -impl DownloadClient for FetchClient { +impl DownloadClient for FetchClient { fn report_bad_message(&self, peer_id: PeerId) { self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage); } @@ -53,8 +53,9 @@ impl DownloadClient for FetchClient { // or an error. type HeadersClientFuture = Either, future::Ready>; -impl HeadersClient for FetchClient { - type Output = HeadersClientFuture>>; +impl HeadersClient for FetchClient { + type Header = N::BlockHeader; + type Output = HeadersClientFuture>>; /// Sends a `GetBlockHeaders` request to an available peer. fn get_headers_with_priority( @@ -75,8 +76,9 @@ impl HeadersClient for FetchClient { } } -impl BodiesClient for FetchClient { - type Output = BodiesFut; +impl BodiesClient for FetchClient { + type Body = N::BlockBody; + type Output = BodiesFut; /// Sends a `GetBlockBodies` request to an available peer. fn get_block_bodies_with_priority( diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index d37fa8b4f..8af6300b7 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -15,7 +15,7 @@ use std::{ use alloy_primitives::B256; use futures::StreamExt; -use reth_eth_wire::{GetBlockBodies, GetBlockHeaders}; +use reth_eth_wire::{EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives}; use reth_network_api::test_utils::PeersHandle; use reth_network_p2p::{ error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult}, @@ -24,12 +24,14 @@ use reth_network_p2p::{ }; use reth_network_peers::PeerId; use reth_network_types::ReputationChangeKind; -use reth_primitives::{BlockBody, Header}; use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::message::BlockRequest; +type InflightHeadersRequest = Request>>; +type InflightBodiesRequest = Request, PeerRequestResult>>; + /// Manages data fetching operations. /// /// This type is hooked into the staged sync pipeline and delegates download request to available @@ -37,13 +39,11 @@ use crate::message::BlockRequest; /// /// This type maintains a list of connected peers that are available for requests. #[derive(Debug)] -pub struct StateFetcher { +pub struct StateFetcher { /// Currently active [`GetBlockHeaders`] requests - inflight_headers_requests: - HashMap>>>, + inflight_headers_requests: HashMap>, /// Currently active [`GetBlockBodies`] requests - inflight_bodies_requests: - HashMap, PeerRequestResult>>>, + inflight_bodies_requests: HashMap>, /// The list of _available_ peers for requests. peers: HashMap, /// The handle to the peers manager @@ -51,16 +51,16 @@ pub struct StateFetcher { /// Number of active peer sessions the node's currently handling. num_active_peers: Arc, /// Requests queued for processing - queued_requests: VecDeque, + queued_requests: VecDeque>, /// Receiver for new incoming download requests - download_requests_rx: UnboundedReceiverStream, + download_requests_rx: UnboundedReceiverStream>, /// Sender for download requests, used to detach a [`FetchClient`] - download_requests_tx: UnboundedSender, + download_requests_tx: UnboundedSender>, } // === impl StateSyncer === -impl StateFetcher { +impl StateFetcher { pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc) -> Self { let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel(); Self { @@ -217,7 +217,7 @@ impl StateFetcher { /// Handles a new request to a peer. /// /// Caution: this assumes the peer exists and is idle - fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest) -> BlockRequest { + fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest) -> BlockRequest { // update the peer's state if let Some(peer) = self.peers.get_mut(&peer_id) { peer.state = req.peer_state(); @@ -260,7 +260,7 @@ impl StateFetcher { pub(crate) fn on_block_headers_response( &mut self, peer_id: PeerId, - res: RequestResult>, + res: RequestResult>, ) -> Option { let is_error = res.is_err(); let maybe_reputation_change = res.reputation_change_err(); @@ -296,7 +296,7 @@ impl StateFetcher { pub(crate) fn on_block_bodies_response( &mut self, peer_id: PeerId, - res: RequestResult>, + res: RequestResult>, ) -> Option { let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty()); @@ -315,7 +315,7 @@ impl StateFetcher { } /// Returns a new [`FetchClient`] that can send requests to this type. - pub(crate) fn client(&self) -> FetchClient { + pub(crate) fn client(&self) -> FetchClient { FetchClient { request_tx: self.download_requests_tx.clone(), peers_handle: self.peers_handle.clone(), @@ -405,24 +405,24 @@ struct Request { /// Requests that can be sent to the Syncer from a [`FetchClient`] #[derive(Debug)] -pub(crate) enum DownloadRequest { +pub(crate) enum DownloadRequest { /// Download the requested headers and send response through channel GetBlockHeaders { request: HeadersRequest, - response: oneshot::Sender>>, + response: oneshot::Sender>>, priority: Priority, }, /// Download the requested headers and send response through channel GetBlockBodies { request: Vec, - response: oneshot::Sender>>, + response: oneshot::Sender>>, priority: Priority, }, } // === impl DownloadRequest === -impl DownloadRequest { +impl DownloadRequest { /// Returns the corresponding state for a peer that handles the request. const fn peer_state(&self) -> PeerState { match self { @@ -472,13 +472,14 @@ pub(crate) enum BlockResponseOutcome { mod tests { use super::*; use crate::{peers::PeersManager, PeersConfig}; + use alloy_consensus::Header; use alloy_primitives::B512; use std::future::poll_fn; #[tokio::test(flavor = "multi_thread")] async fn test_poll_fetcher() { let manager = PeersManager::new(PeersConfig::default()); - let mut fetcher = StateFetcher::new(manager.handle(), Default::default()); + let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default()); poll_fn(move |cx| { assert!(fetcher.poll(cx).is_pending()); @@ -498,7 +499,7 @@ mod tests { #[tokio::test] async fn test_peer_rotation() { let manager = PeersManager::new(PeersConfig::default()); - let mut fetcher = StateFetcher::new(manager.handle(), Default::default()); + let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default()); // Add a few random peers let peer1 = B512::random(); let peer2 = B512::random(); @@ -521,7 +522,7 @@ mod tests { #[tokio::test] async fn test_peer_prioritization() { let manager = PeersManager::new(PeersConfig::default()); - let mut fetcher = StateFetcher::new(manager.handle(), Default::default()); + let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default()); // Add a few random peers let peer1 = B512::random(); let peer2 = B512::random(); @@ -546,7 +547,7 @@ mod tests { #[tokio::test] async fn test_on_block_headers_response() { let manager = PeersManager::new(PeersConfig::default()); - let mut fetcher = StateFetcher::new(manager.handle(), Default::default()); + let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default()); let peer_id = B512::random(); assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None); @@ -576,7 +577,7 @@ mod tests { #[tokio::test] async fn test_header_response_outcome() { let manager = PeersManager::new(PeersConfig::default()); - let mut fetcher = StateFetcher::new(manager.handle(), Default::default()); + let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default()); let peer_id = B512::random(); let request_pair = || { @@ -610,7 +611,10 @@ mod tests { let outcome = fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap(); - assert!(EthResponseValidator::reputation_change_err(&Err(RequestError::Timeout)).is_some()); + assert!(EthResponseValidator::reputation_change_err(&Err::, _>( + RequestError::Timeout + )) + .is_some()); match outcome { BlockResponseOutcome::BadResponse(peer, _) => { diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 6b8287fe5..bdb13875f 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -12,12 +12,13 @@ use alloy_primitives::{Bytes, B256}; use futures::FutureExt; use reth_eth_wire::{ capability::RawCapabilityMessage, message::RequestPair, BlockBodies, BlockHeaders, EthMessage, - GetBlockBodies, GetBlockHeaders, NewBlock, NewBlockHashes, NewPooledTransactionHashes, - NodeData, PooledTransactions, Receipts, SharedTransactions, Transactions, + EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives, NewBlock, + NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, + SharedTransactions, Transactions, }; use reth_network_api::PeerRequest; use reth_network_p2p::error::{RequestError, RequestResult}; -use reth_primitives::{BlockBody, Header, PooledTransactionsElement, ReceiptWithBloom}; +use reth_primitives::{PooledTransactionsElement, ReceiptWithBloom}; use tokio::sync::oneshot; /// Internal form of a `NewBlock` message @@ -74,16 +75,16 @@ pub enum BlockRequest { /// Corresponding variant for [`PeerRequest`]. #[derive(Debug)] -pub enum PeerResponse { +pub enum PeerResponse { /// Represents a response to a request for block headers. BlockHeaders { /// The receiver channel for the response to a block headers request. - response: oneshot::Receiver>, + response: oneshot::Receiver>>, }, /// Represents a response to a request for block bodies. BlockBodies { /// The receiver channel for the response to a block bodies request. - response: oneshot::Receiver>, + response: oneshot::Receiver>>, }, /// Represents a response to a request for pooled transactions. PooledTransactions { @@ -104,9 +105,9 @@ pub enum PeerResponse { // === impl PeerResponse === -impl PeerResponse { +impl PeerResponse { /// Polls the type to completion. - pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { macro_rules! poll_request { ($response:ident, $item:ident, $cx:ident) => { match ready!($response.poll_unpin($cx)) { @@ -139,11 +140,11 @@ impl PeerResponse { /// All response variants for [`PeerResponse`] #[derive(Debug)] -pub enum PeerResponseResult { +pub enum PeerResponseResult { /// Represents a result containing block headers or an error. - BlockHeaders(RequestResult>), + BlockHeaders(RequestResult>), /// Represents a result containing block bodies or an error. - BlockBodies(RequestResult>), + BlockBodies(RequestResult>), /// Represents a result containing pooled transactions or an error. PooledTransactions(RequestResult>), /// Represents a result containing node data or an error. @@ -154,9 +155,9 @@ pub enum PeerResponseResult { // === impl PeerResponseResult === -impl PeerResponseResult { +impl PeerResponseResult { /// Converts this response into an [`EthMessage`] - pub fn try_into_message(self, id: u64) -> RequestResult { + pub fn try_into_message(self, id: u64) -> RequestResult> { macro_rules! to_message { ($response:ident, $item:ident, $request_id:ident) => { match $response { diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 594ad4d15..4175757e0 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -18,10 +18,7 @@ use reth_network_api::{ NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers, PeersInfo, }; -use reth_network_p2p::{ - sync::{NetworkSyncUpdater, SyncState, SyncStateProvider}, - BlockClient, -}; +use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider}; use reth_network_peers::{NodeRecord, PeerId}; use reth_network_types::{PeerAddr, PeerKind, Reputation, ReputationChangeKind}; use reth_primitives::{Head, TransactionSigned}; @@ -400,7 +397,9 @@ impl NetworkSyncUpdater for NetworkHandle { } impl BlockDownloaderProvider for NetworkHandle { - async fn fetch_client(&self) -> Result { + type Client = FetchClient; + + async fn fetch_client(&self) -> Result { let (tx, rx) = oneshot::channel(); let _ = self.manager().send(NetworkHandleMessage::FetchClient(tx)); rx.await diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 5caa656a9..9ad7b5351 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -14,7 +14,10 @@ use std::{ use alloy_primitives::B256; use rand::seq::SliceRandom; -use reth_eth_wire::{BlockHashNumber, Capabilities, DisconnectReason, NewBlockHashes, Status}; +use reth_eth_wire::{ + BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives, + NewBlockHashes, Status, +}; use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender}; use reth_network_peers::PeerId; use reth_network_types::{PeerAddr, PeerKind}; @@ -69,9 +72,9 @@ impl Deref for BlockNumReader { /// /// This type is also responsible for responding for received request. #[derive(Debug)] -pub struct NetworkState { +pub struct NetworkState { /// All active peers and their state. - active_peers: HashMap, + active_peers: HashMap>, /// Manages connections to peers. peers_manager: PeersManager, /// Buffered messages until polled. @@ -88,10 +91,10 @@ pub struct NetworkState { /// The fetcher streams `RLPx` related requests on a per-peer basis to this type. This type /// will then queue in the request and notify the fetcher once the result has been /// received. - state_fetcher: StateFetcher, + state_fetcher: StateFetcher, } -impl NetworkState { +impl NetworkState { /// Create a new state instance with the given params pub(crate) fn new( client: BlockNumReader, @@ -126,7 +129,7 @@ impl NetworkState { } /// Returns a new [`FetchClient`] - pub(crate) fn fetch_client(&self) -> FetchClient { + pub(crate) fn fetch_client(&self) -> FetchClient { self.state_fetcher.client() } @@ -144,7 +147,7 @@ impl NetworkState { peer: PeerId, capabilities: Arc, status: Arc, - request_tx: PeerRequestSender, + request_tx: PeerRequestSender>, timeout: Arc, ) { debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible"); @@ -399,7 +402,11 @@ impl NetworkState { /// Delegates the response result to the fetcher which may return an outcome specific /// instruction that needs to be handled in [`Self::on_block_response_outcome`]. This could be /// a follow-up request or an instruction to slash the peer's reputation. - fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult) -> Option { + fn on_eth_response( + &mut self, + peer: PeerId, + resp: PeerResponseResult, + ) -> Option { match resp { PeerResponseResult::BlockHeaders(res) => { let outcome = self.state_fetcher.on_block_headers_response(peer, res)?; @@ -492,16 +499,16 @@ impl NetworkState { /// /// For example known blocks,so we can decide what to announce. #[derive(Debug)] -pub(crate) struct ActivePeer { +pub(crate) struct ActivePeer { /// Best block of the peer. pub(crate) best_hash: B256, /// The capabilities of the remote peer. #[allow(dead_code)] pub(crate) capabilities: Arc, /// A communication channel directly to the session task. - pub(crate) request_tx: PeerRequestSender, + pub(crate) request_tx: PeerRequestSender>, /// The response receiver for a currently active request to that peer. - pub(crate) pending_response: Option, + pub(crate) pending_response: Option>, /// Blocks we know the peer has. pub(crate) blocks: LruCache, } diff --git a/crates/net/p2p/Cargo.toml b/crates/net/p2p/Cargo.toml index 898553969..9348bf2d0 100644 --- a/crates/net/p2p/Cargo.toml +++ b/crates/net/p2p/Cargo.toml @@ -22,6 +22,7 @@ reth-network-types.workspace = true reth-storage-errors.workspace = true # ethereum +alloy-consensus.workspace = true alloy-eips.workspace = true alloy-primitives.workspace = true @@ -55,5 +56,6 @@ std = [ "reth-primitives/std", "alloy-eips/std", "alloy-primitives/std", - "reth-primitives-traits/std" + "reth-primitives-traits/std", + "alloy-consensus/std", ] diff --git a/crates/net/p2p/src/bodies/client.rs b/crates/net/p2p/src/bodies/client.rs index 2a4b57c23..d48fccc6d 100644 --- a/crates/net/p2p/src/bodies/client.rs +++ b/crates/net/p2p/src/bodies/client.rs @@ -9,13 +9,16 @@ use futures::{Future, FutureExt}; use reth_primitives::BlockBody; /// The bodies future type -pub type BodiesFut = Pin>> + Send + Sync>>; +pub type BodiesFut = + Pin>> + Send + Sync>>; /// A client capable of downloading block bodies. #[auto_impl::auto_impl(&, Arc, Box)] pub trait BodiesClient: DownloadClient { + /// The body type this client fetches. + type Body: Send + Sync + Unpin + 'static; /// The output of the request future for querying block bodies. - type Output: Future>> + Sync + Send + Unpin; + type Output: Future>> + Sync + Send + Unpin; /// Fetches the block body for the requested block. fn get_block_bodies(&self, hashes: Vec) -> Self::Output { @@ -49,11 +52,11 @@ pub struct SingleBodyRequest { fut: Fut, } -impl Future for SingleBodyRequest +impl Future for SingleBodyRequest where - Fut: Future>> + Sync + Send + Unpin, + Fut: Future>> + Sync + Send + Unpin, { - type Output = PeerRequestResult>; + type Output = PeerRequestResult>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let resp = ready!(self.get_mut().fut.poll_unpin(cx)); diff --git a/crates/net/p2p/src/bodies/downloader.rs b/crates/net/p2p/src/bodies/downloader.rs index b55229fa2..f335b2143 100644 --- a/crates/net/p2p/src/bodies/downloader.rs +++ b/crates/net/p2p/src/bodies/downloader.rs @@ -5,14 +5,19 @@ use futures::Stream; use std::ops::RangeInclusive; /// Body downloader return type. -pub type BodyDownloaderResult = DownloadResult>; +pub type BodyDownloaderResult = DownloadResult>>; /// A downloader capable of fetching and yielding block bodies from block headers. /// /// A downloader represents a distinct strategy for submitting requests to download block bodies, /// while a [`BodiesClient`][crate::bodies::client::BodiesClient] represents a client capable of /// fulfilling these requests. -pub trait BodyDownloader: Send + Sync + Stream + Unpin { +pub trait BodyDownloader: + Send + Sync + Stream> + Unpin +{ + /// The type of the body that is being downloaded. + type Body: Send + Sync + Unpin + 'static; + /// Method for setting the download range. fn set_download_range(&mut self, range: RangeInclusive) -> DownloadResult<()>; } diff --git a/crates/net/p2p/src/bodies/response.rs b/crates/net/p2p/src/bodies/response.rs index 0a45008ac..8737647bd 100644 --- a/crates/net/p2p/src/bodies/response.rs +++ b/crates/net/p2p/src/bodies/response.rs @@ -1,17 +1,17 @@ use alloy_primitives::{BlockNumber, U256}; -use reth_primitives::{SealedBlock, SealedHeader}; +use reth_primitives::{BlockBody, SealedBlock, SealedHeader}; use reth_primitives_traits::InMemorySize; /// The block response #[derive(PartialEq, Eq, Debug, Clone)] -pub enum BlockResponse { +pub enum BlockResponse { /// Full block response (with transactions or ommers) - Full(SealedBlock), + Full(SealedBlock), /// The empty block response Empty(SealedHeader), } -impl BlockResponse { +impl BlockResponse { /// Return the reference to the response header pub const fn header(&self) -> &SealedHeader { match self { @@ -34,8 +34,7 @@ impl BlockResponse { } } -impl InMemorySize for BlockResponse { - /// Calculates a heuristic for the in-memory size of the [`BlockResponse`]. +impl InMemorySize for BlockResponse { #[inline] fn size(&self) -> usize { match self { diff --git a/crates/net/p2p/src/either.rs b/crates/net/p2p/src/either.rs index 30650069b..3f1182bd4 100644 --- a/crates/net/p2p/src/either.rs +++ b/crates/net/p2p/src/either.rs @@ -32,8 +32,9 @@ where impl BodiesClient for Either where A: BodiesClient, - B: BodiesClient, + B: BodiesClient, { + type Body = A::Body; type Output = Either; fn get_block_bodies_with_priority( @@ -51,8 +52,9 @@ where impl HeadersClient for Either where A: HeadersClient, - B: HeadersClient, + B: HeadersClient
, { + type Header = A::Header; type Output = Either; fn get_headers_with_priority( diff --git a/crates/net/p2p/src/error.rs b/crates/net/p2p/src/error.rs index 9394a9fdf..181a0b96b 100644 --- a/crates/net/p2p/src/error.rs +++ b/crates/net/p2p/src/error.rs @@ -1,13 +1,14 @@ use std::ops::RangeInclusive; use super::headers::client::HeadersRequest; +use alloy_consensus::BlockHeader; use alloy_eips::BlockHashOrNumber; use alloy_primitives::{BlockNumber, B256}; use derive_more::{Display, Error}; use reth_consensus::ConsensusError; use reth_network_peers::WithPeerId; use reth_network_types::ReputationChangeKind; -use reth_primitives::{GotExpected, GotExpectedBoxed, Header}; +use reth_primitives::{GotExpected, GotExpectedBoxed}; use reth_storage_errors::{db::DatabaseError, provider::ProviderError}; use tokio::sync::{mpsc, oneshot}; @@ -26,7 +27,7 @@ pub trait EthResponseValidator { fn reputation_change_err(&self) -> Option; } -impl EthResponseValidator for RequestResult> { +impl EthResponseValidator for RequestResult> { fn is_likely_bad_headers_response(&self, request: &HeadersRequest) -> bool { match self { Ok(headers) => { @@ -38,7 +39,7 @@ impl EthResponseValidator for RequestResult> { match request.start { BlockHashOrNumber::Number(block_number) => { - headers.first().is_some_and(|header| block_number != header.number) + headers.first().is_some_and(|header| block_number != header.number()) } BlockHashOrNumber::Hash(_) => { // we don't want to hash the header @@ -216,6 +217,8 @@ impl From for DownloadError { #[cfg(test)] mod tests { + use alloy_consensus::Header; + use super::*; #[test] diff --git a/crates/net/p2p/src/full_block.rs b/crates/net/p2p/src/full_block.rs index a61d4ea12..8f176f8da 100644 --- a/crates/net/p2p/src/full_block.rs +++ b/crates/net/p2p/src/full_block.rs @@ -5,16 +5,18 @@ use crate::{ headers::client::{HeadersClient, SingleHeaderRequest}, BlockClient, }; +use alloy_consensus::BlockHeader; use alloy_primitives::{Sealable, B256}; use reth_consensus::Consensus; use reth_eth_wire_types::HeadersDirection; use reth_network_peers::WithPeerId; -use reth_primitives::{BlockBody, Header, SealedBlock, SealedHeader}; +use reth_primitives::{SealedBlock, SealedHeader}; use std::{ cmp::Reverse, collections::{HashMap, VecDeque}, fmt::Debug, future::Future, + hash::Hash, pin::Pin, sync::Arc, task::{ready, Context, Poll}, @@ -23,14 +25,23 @@ use tracing::debug; /// A Client that can fetch full blocks from the network. #[derive(Debug, Clone)] -pub struct FullBlockClient { +pub struct FullBlockClient +where + Client: BlockClient, +{ client: Client, - consensus: Arc, + consensus: Arc>, } -impl FullBlockClient { +impl FullBlockClient +where + Client: BlockClient, +{ /// Creates a new instance of `FullBlockClient`. - pub fn new(client: Client, consensus: Arc) -> Self { + pub fn new( + client: Client, + consensus: Arc>, + ) -> Self { Self { client, consensus } } @@ -111,16 +122,16 @@ where Client: BlockClient, { client: Client, - consensus: Arc, + consensus: Arc>, hash: B256, request: FullBlockRequest, - header: Option, - body: Option, + header: Option>, + body: Option>, } impl FetchFullBlockFuture where - Client: BlockClient, + Client: BlockClient, { /// Returns the hash of the block being requested. pub const fn hash(&self) -> &B256 { @@ -129,11 +140,11 @@ where /// If the header request is already complete, this returns the block number pub fn block_number(&self) -> Option { - self.header.as_ref().map(|h| h.number) + self.header.as_ref().map(|h| h.number()) } /// Returns the [`SealedBlock`] if the request is complete and valid. - fn take_block(&mut self) -> Option { + fn take_block(&mut self) -> Option> { if self.header.is_none() || self.body.is_none() { return None } @@ -157,7 +168,7 @@ where } } - fn on_block_response(&mut self, resp: WithPeerId) { + fn on_block_response(&mut self, resp: WithPeerId) { if let Some(ref header) = self.header { if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) { debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body"); @@ -173,9 +184,9 @@ where impl Future for FetchFullBlockFuture where - Client: BlockClient + 'static, + Client: BlockClient + 'static, { - type Output = SealedBlock; + type Output = SealedBlock; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); @@ -252,7 +263,7 @@ where impl Debug for FetchFullBlockFuture where - Client: BlockClient, + Client: BlockClient, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FetchFullBlockFuture") @@ -275,7 +286,7 @@ impl FullBlockRequest where Client: BlockClient, { - fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() { if let Poll::Ready(res) = fut.poll(cx) { self.header = None; @@ -296,18 +307,18 @@ where /// The result of a request for a single header or body. This is yielded by the `FullBlockRequest` /// future. -enum ResponseResult { - Header(PeerRequestResult>), - Body(PeerRequestResult>), +enum ResponseResult { + Header(PeerRequestResult>), + Body(PeerRequestResult>), } /// The response of a body request. #[derive(Debug)] -enum BodyResponse { +enum BodyResponse { /// Already validated against transaction root of header - Validated(BlockBody), + Validated(B), /// Still needs to be validated against header - PendingValidation(WithPeerId), + PendingValidation(WithPeerId), } /// A future that downloads a range of full blocks from the network. /// @@ -330,7 +341,7 @@ where /// The client used to fetch headers and bodies. client: Client, /// The consensus instance used to validate the blocks. - consensus: Arc, + consensus: Arc>, /// The block hash to start fetching from (inclusive). start_hash: B256, /// How many blocks to fetch: `len([start_hash, ..]) == count` @@ -338,16 +349,16 @@ where /// Requests for headers and bodies that are in progress. request: FullBlockRangeRequest, /// Fetched headers. - headers: Option>, + headers: Option>>, /// The next headers to request bodies for. This is drained as responses are received. - pending_headers: VecDeque, + pending_headers: VecDeque>, /// The bodies that have been received so far. - bodies: HashMap, + bodies: HashMap, BodyResponse>, } impl FetchFullBlockRangeFuture where - Client: BlockClient, + Client: BlockClient, { /// Returns the block hashes for the given range, if they are available. pub fn range_block_hashes(&self) -> Option> { @@ -362,14 +373,14 @@ where /// Inserts a block body, matching it with the `next_header`. /// /// Note: this assumes the response matches the next header in the queue. - fn insert_body(&mut self, body_response: BodyResponse) { + fn insert_body(&mut self, body_response: BodyResponse) { if let Some(header) = self.pending_headers.pop_front() { self.bodies.insert(header, body_response); } } /// Inserts multiple block bodies. - fn insert_bodies(&mut self, bodies: impl IntoIterator) { + fn insert_bodies(&mut self, bodies: impl IntoIterator>) { for body in bodies { self.insert_body(body); } @@ -388,7 +399,7 @@ where /// /// These are returned in falling order starting with the requested `hash`, i.e. with /// descending block numbers. - fn take_blocks(&mut self) -> Option> { + fn take_blocks(&mut self) -> Option>> { if !self.is_bodies_complete() { // not done with bodies yet return None @@ -445,7 +456,7 @@ where Some(valid_responses) } - fn on_headers_response(&mut self, headers: WithPeerId>) { + fn on_headers_response(&mut self, headers: WithPeerId>) { let (peer, mut headers_falling) = headers .map(|h| { h.into_iter() @@ -461,7 +472,7 @@ where // fill in the response if it's the correct length if headers_falling.len() == self.count as usize { // sort headers from highest to lowest block number - headers_falling.sort_unstable_by_key(|h| Reverse(h.number)); + headers_falling.sort_unstable_by_key(|h| Reverse(h.number())); // check the starting hash if headers_falling[0].hash() == self.start_hash { @@ -512,9 +523,9 @@ where impl Future for FetchFullBlockRangeFuture where - Client: BlockClient + 'static, + Client: BlockClient + 'static, { - type Output = Vec; + type Output = Vec>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); @@ -621,7 +632,10 @@ impl FullBlockRangeRequest where Client: BlockClient, { - fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() { if let Poll::Ready(res) = fut.poll(cx) { self.headers = None; @@ -642,13 +656,15 @@ where // The result of a request for headers or block bodies. This is yielded by the // `FullBlockRangeRequest` future. -enum RangeResponseResult { - Header(PeerRequestResult>), - Body(PeerRequestResult>), +enum RangeResponseResult { + Header(PeerRequestResult>), + Body(PeerRequestResult>), } #[cfg(test)] mod tests { + use reth_primitives::BlockBody; + use super::*; use crate::test_utils::TestFullBlockClient; use std::ops::Range; diff --git a/crates/net/p2p/src/headers/client.rs b/crates/net/p2p/src/headers/client.rs index b73ea4e92..585f2ab18 100644 --- a/crates/net/p2p/src/headers/client.rs +++ b/crates/net/p2p/src/headers/client.rs @@ -27,8 +27,10 @@ pub type HeadersFut = Pin> /// The block headers downloader client #[auto_impl::auto_impl(&, Arc, Box)] pub trait HeadersClient: DownloadClient { + /// The header type this client fetches. + type Header: Send + Sync + Unpin; /// The headers future type - type Output: Future>> + Sync + Send + Unpin; + type Output: Future>> + Sync + Send + Unpin; /// Sends the header request to the p2p network and returns the header response received from a /// peer. @@ -73,11 +75,11 @@ pub struct SingleHeaderRequest { fut: Fut, } -impl Future for SingleHeaderRequest +impl Future for SingleHeaderRequest where - Fut: Future>> + Sync + Send + Unpin, + Fut: Future>> + Sync + Send + Unpin, { - type Output = PeerRequestResult>; + type Output = PeerRequestResult>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let resp = ready!(self.get_mut().fut.poll_unpin(cx)); diff --git a/crates/net/p2p/src/headers/downloader.rs b/crates/net/p2p/src/headers/downloader.rs index 5565880ed..59ecb58b8 100644 --- a/crates/net/p2p/src/headers/downloader.rs +++ b/crates/net/p2p/src/headers/downloader.rs @@ -1,5 +1,6 @@ use super::error::HeadersDownloaderResult; use crate::error::{DownloadError, DownloadResult}; +use alloy_consensus::BlockHeader; use alloy_eips::BlockHashOrNumber; use alloy_primitives::B256; use futures::Stream; @@ -13,19 +14,25 @@ use reth_primitives::SealedHeader; /// /// A [`HeaderDownloader`] is a [Stream] that returns batches of headers. pub trait HeaderDownloader: - Send + Sync + Stream>> + Unpin + Send + + Sync + + Stream>, Self::Header>> + + Unpin { + /// The header type being downloaded. + type Header: Send + Sync + Unpin + 'static; + /// Updates the gap to sync which ranges from local head to the sync target /// /// See also [`HeaderDownloader::update_sync_target`] and /// [`HeaderDownloader::update_local_head`] - fn update_sync_gap(&mut self, head: SealedHeader, target: SyncTarget) { + fn update_sync_gap(&mut self, head: SealedHeader, target: SyncTarget) { self.update_local_head(head); self.update_sync_target(target); } /// Updates the block number of the local database - fn update_local_head(&mut self, head: SealedHeader); + fn update_local_head(&mut self, head: SealedHeader); /// Updates the target we want to sync to fn update_sync_target(&mut self, target: SyncTarget); @@ -74,23 +81,23 @@ impl SyncTarget { /// Validate whether the header is valid in relation to it's parent /// /// Returns Ok(false) if the -pub fn validate_header_download( - consensus: &dyn Consensus, - header: &SealedHeader, - parent: &SealedHeader, +pub fn validate_header_download( + consensus: &dyn Consensus, + header: &SealedHeader, + parent: &SealedHeader, ) -> DownloadResult<()> { // validate header against parent consensus.validate_header_against_parent(header, parent).map_err(|error| { DownloadError::HeaderValidation { hash: header.hash(), - number: header.number, + number: header.number(), error: Box::new(error), } })?; // validate header standalone consensus.validate_header(header).map_err(|error| DownloadError::HeaderValidation { hash: header.hash(), - number: header.number, + number: header.number(), error: Box::new(error), })?; Ok(()) diff --git a/crates/net/p2p/src/headers/error.rs b/crates/net/p2p/src/headers/error.rs index b22aae924..8757bb215 100644 --- a/crates/net/p2p/src/headers/error.rs +++ b/crates/net/p2p/src/headers/error.rs @@ -3,19 +3,19 @@ use reth_consensus::ConsensusError; use reth_primitives::SealedHeader; /// Header downloader result -pub type HeadersDownloaderResult = Result; +pub type HeadersDownloaderResult = Result>; /// Error variants that can happen when sending requests to a session. #[derive(Debug, Clone, Eq, PartialEq, Display, Error)] -pub enum HeadersDownloaderError { +pub enum HeadersDownloaderError { /// The downloaded header cannot be attached to the local head, /// but is valid otherwise. #[display("valid downloaded header cannot be attached to the local head: {error}")] DetachedHead { /// The local head we attempted to attach to. - local_head: Box, + local_head: Box>, /// The header we attempted to attach. - header: Box, + header: Box>, /// The error that occurred when attempting to attach the header. #[error(source)] error: Box, diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 2ba8012f0..98d83c2d1 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -52,3 +52,14 @@ pub use headers::client::HeadersClient; pub trait BlockClient: HeadersClient + BodiesClient + Unpin + Clone {} impl BlockClient for T where T: HeadersClient + BodiesClient + Unpin + Clone {} + +/// The [`BlockClient`] providing Ethereum block parts. +pub trait EthBlockClient: + BlockClient
+{ +} + +impl EthBlockClient for T where + T: BlockClient
+{ +} diff --git a/crates/net/p2p/src/test_utils/bodies.rs b/crates/net/p2p/src/test_utils/bodies.rs index cfd292129..0689d403f 100644 --- a/crates/net/p2p/src/test_utils/bodies.rs +++ b/crates/net/p2p/src/test_utils/bodies.rs @@ -36,6 +36,7 @@ impl BodiesClient for TestBodiesClient where F: Fn(Vec) -> PeerRequestResult> + Send + Sync, { + type Body = BlockBody; type Output = BodiesFut; fn get_block_bodies_with_priority( diff --git a/crates/net/p2p/src/test_utils/full_block.rs b/crates/net/p2p/src/test_utils/full_block.rs index 8a13f6932..97d867531 100644 --- a/crates/net/p2p/src/test_utils/full_block.rs +++ b/crates/net/p2p/src/test_utils/full_block.rs @@ -40,6 +40,7 @@ impl DownloadClient for NoopFullBlockClient { /// Implements the `BodiesClient` trait for the `NoopFullBlockClient` struct. impl BodiesClient for NoopFullBlockClient { + type Body = BlockBody; /// Defines the output type of the function. type Output = futures::future::Ready>>; @@ -65,6 +66,7 @@ impl BodiesClient for NoopFullBlockClient { } impl HeadersClient for NoopFullBlockClient { + type Header = Header; /// The output type representing a future containing a peer request result with a vector of /// headers. type Output = futures::future::Ready>>; @@ -152,6 +154,7 @@ impl DownloadClient for TestFullBlockClient { /// Implements the `HeadersClient` trait for the `TestFullBlockClient` struct. impl HeadersClient for TestFullBlockClient { + type Header = Header; /// Specifies the associated output type. type Output = futures::future::Ready>>; @@ -205,6 +208,7 @@ impl HeadersClient for TestFullBlockClient { /// Implements the `BodiesClient` trait for the `TestFullBlockClient` struct. impl BodiesClient for TestFullBlockClient { + type Body = BlockBody; /// Defines the output type of the function. type Output = futures::future::Ready>>; diff --git a/crates/net/p2p/src/test_utils/headers.rs b/crates/net/p2p/src/test_utils/headers.rs index 4f603f633..d8d4bbc6b 100644 --- a/crates/net/p2p/src/test_utils/headers.rs +++ b/crates/net/p2p/src/test_utils/headers.rs @@ -62,6 +62,8 @@ impl TestHeaderDownloader { } impl HeaderDownloader for TestHeaderDownloader { + type Header = Header; + fn update_local_head(&mut self, _head: SealedHeader) {} fn update_sync_target(&mut self, _target: SyncTarget) {} @@ -72,7 +74,7 @@ impl HeaderDownloader for TestHeaderDownloader { } impl Stream for TestHeaderDownloader { - type Item = HeadersDownloaderResult>; + type Item = HeadersDownloaderResult, Header>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -229,6 +231,7 @@ impl DownloadClient for TestHeadersClient { } impl HeadersClient for TestHeadersClient { + type Header = Header; type Output = TestHeadersFut; fn get_headers_with_priority( diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 856f86c6f..ec4912fdd 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -760,7 +760,7 @@ where /// necessary pub async fn max_block(&self, client: C) -> eyre::Result> where - C: HeadersClient, + C: HeadersClient
, { self.node_config().max_block(client, self.provider_factory().clone()).await } diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index 3591868dd..d8405dad7 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -12,7 +12,7 @@ use reth_downloaders::{ use reth_evm::execute::BlockExecutorProvider; use reth_exex::ExExManagerHandle; use reth_network_p2p::{ - bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, BlockClient, + bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, EthBlockClient, }; use reth_provider::{providers::ProviderNodeTypes, ProviderFactory}; use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet}; @@ -38,7 +38,7 @@ pub fn build_networked_pipeline( ) -> eyre::Result> where N: ProviderNodeTypes, - Client: BlockClient + 'static, + Client: EthBlockClient + 'static, Executor: BlockExecutorProvider, { // building network downloaders using the fetch client @@ -84,8 +84,8 @@ pub fn build_pipeline( ) -> eyre::Result> where N: ProviderNodeTypes, - H: HeaderDownloader + 'static, - B: BodyDownloader + 'static, + H: HeaderDownloader
+ 'static, + B: BodyDownloader + 'static, Executor: BlockExecutorProvider, { let mut builder = Pipeline::::builder(); diff --git a/crates/node/core/Cargo.toml b/crates/node/core/Cargo.toml index 1c6c9d98c..c667a5629 100644 --- a/crates/node/core/Cargo.toml +++ b/crates/node/core/Cargo.toml @@ -13,7 +13,9 @@ workspace = true [dependencies] # reth reth-chainspec.workspace = true +reth-consensus.workspace = true reth-primitives.workspace = true +reth-primitives-traits.workspace = true reth-cli-util.workspace = true reth-db = { workspace = true, features = ["mdbx"] } reth-storage-errors.workspace = true @@ -30,7 +32,6 @@ reth-discv4.workspace = true reth-discv5.workspace = true reth-net-nat.workspace = true reth-network-peers.workspace = true -reth-consensus-common.workspace = true reth-prune-types.workspace = true reth-stages-types.workspace = true diff --git a/crates/node/core/src/node_config.rs b/crates/node/core/src/node_config.rs index 3848772c4..24d5588b6 100644 --- a/crates/node/core/src/node_config.rs +++ b/crates/node/core/src/node_config.rs @@ -8,6 +8,7 @@ use crate::{ dirs::{ChainPath, DataDirPath}, utils::get_single_header, }; +use alloy_consensus::BlockHeader; use eyre::eyre; use reth_chainspec::{ChainSpec, EthChainSpec, MAINNET}; use reth_config::config::PruneConfig; @@ -273,7 +274,7 @@ impl NodeConfig { ) -> eyre::Result> where Provider: HeaderProvider, - Client: HeadersClient, + Client: HeadersClient, { let max_block = if let Some(block) = self.debug.max_block { Some(block) @@ -332,7 +333,7 @@ impl NodeConfig { ) -> ProviderResult where Provider: HeaderProvider, - Client: HeadersClient, + Client: HeadersClient, { let header = provider.header_by_hash_or_number(tip.into())?; @@ -342,7 +343,7 @@ impl NodeConfig { return Ok(header.number) } - Ok(self.fetch_tip_from_network(client, tip.into()).await.number) + Ok(self.fetch_tip_from_network(client, tip.into()).await.number()) } /// Attempt to look up the block with the given number and return the header. @@ -352,9 +353,9 @@ impl NodeConfig { &self, client: Client, tip: BlockHashOrNumber, - ) -> SealedHeader + ) -> SealedHeader where - Client: HeadersClient, + Client: HeadersClient, { info!(target: "reth::cli", ?tip, "Fetching tip block from the network."); let mut fetch_failures = 0; diff --git a/crates/node/core/src/utils.rs b/crates/node/core/src/utils.rs index a04d4e324..7aeb14c4c 100644 --- a/crates/node/core/src/utils.rs +++ b/crates/node/core/src/utils.rs @@ -1,12 +1,12 @@ //! Utility functions for node startup and shutdown, for example path parsing and retrieving single //! blocks from the network. +use alloy_consensus::BlockHeader; use alloy_eips::BlockHashOrNumber; use alloy_primitives::Sealable; use alloy_rpc_types_engine::{JwtError, JwtSecret}; use eyre::Result; -use reth_chainspec::ChainSpec; -use reth_consensus_common::validation::validate_block_pre_execution; +use reth_consensus::Consensus; use reth_network_p2p::{ bodies::client::BodiesClient, headers::client::{HeadersClient, HeadersDirection, HeadersRequest}, @@ -16,7 +16,6 @@ use reth_primitives::{SealedBlock, SealedHeader}; use std::{ env::VarError, path::{Path, PathBuf}, - sync::Arc, }; use tracing::{debug, info}; @@ -41,9 +40,9 @@ pub fn get_or_create_jwt_secret_from_path(path: &Path) -> Result( client: Client, id: BlockHashOrNumber, -) -> Result +) -> Result> where - Client: HeadersClient, + Client: HeadersClient, { let request = HeadersRequest { direction: HeadersDirection::Rising, limit: 1, start: id }; @@ -61,7 +60,7 @@ where let valid = match id { BlockHashOrNumber::Hash(hash) => header.hash() == hash, - BlockHashOrNumber::Number(number) => header.number == number, + BlockHashOrNumber::Number(number) => header.number() == number, }; if !valid { @@ -77,11 +76,11 @@ where } /// Get a body from network based on header -pub async fn get_single_body( +pub async fn get_single_body( client: Client, - chain_spec: Arc, - header: SealedHeader, -) -> Result + header: SealedHeader, + consensus: impl Consensus, +) -> Result> where Client: BodiesClient, { @@ -95,7 +94,7 @@ where let body = response.unwrap(); let block = SealedBlock { header, body }; - validate_block_pre_execution(&block, &chain_spec)?; + consensus.validate_block_pre_execution(&block)?; Ok(block) } diff --git a/crates/primitives-traits/src/block/header.rs b/crates/primitives-traits/src/block/header.rs index 8ad85a596..7ab76f249 100644 --- a/crates/primitives-traits/src/block/header.rs +++ b/crates/primitives-traits/src/block/header.rs @@ -5,6 +5,8 @@ use core::fmt; use alloy_primitives::Sealable; use reth_codecs::Compact; +use crate::InMemorySize; + /// Helper trait that unifies all behaviour required by block header to support full node /// operations. pub trait FullBlockHeader: BlockHeader + Compact {} @@ -21,12 +23,11 @@ pub trait BlockHeader: + fmt::Debug + PartialEq + Eq - + serde::Serialize - + for<'de> serde::Deserialize<'de> + alloy_rlp::Encodable + alloy_rlp::Decodable + alloy_consensus::BlockHeader + Sealable + + InMemorySize { } @@ -45,5 +46,6 @@ impl BlockHeader for T where + alloy_rlp::Decodable + alloy_consensus::BlockHeader + Sealable + + InMemorySize { } diff --git a/crates/primitives-traits/src/header/sealed.rs b/crates/primitives-traits/src/header/sealed.rs index 995c13748..b0fe44342 100644 --- a/crates/primitives-traits/src/header/sealed.rs +++ b/crates/primitives-traits/src/header/sealed.rs @@ -56,10 +56,10 @@ impl SealedHeader { } } -impl SealedHeader { +impl SealedHeader { /// Return the number hash tuple. pub fn num_hash(&self) -> BlockNumHash { - BlockNumHash::new(self.number, self.hash) + BlockNumHash::new(self.number(), self.hash) } } diff --git a/crates/primitives-traits/src/size.rs b/crates/primitives-traits/src/size.rs index 173f8cedc..0c250688e 100644 --- a/crates/primitives-traits/src/size.rs +++ b/crates/primitives-traits/src/size.rs @@ -3,3 +3,9 @@ pub trait InMemorySize { /// Returns a heuristic for the in-memory size of a struct. fn size(&self) -> usize; } + +impl InMemorySize for alloy_consensus::Header { + fn size(&self) -> usize { + self.size() + } +} diff --git a/crates/primitives/src/block.rs b/crates/primitives/src/block.rs index 275f86c5b..c0586ed6a 100644 --- a/crates/primitives/src/block.rs +++ b/crates/primitives/src/block.rs @@ -428,8 +428,7 @@ impl SealedBlock { } } -impl InMemorySize for SealedBlock { - /// Calculates a heuristic for the in-memory size of the [`SealedBlock`]. +impl InMemorySize for SealedBlock { #[inline] fn size(&self) -> usize { self.header.size() + self.body.size() diff --git a/crates/stages/stages/src/stages/bodies.rs b/crates/stages/stages/src/stages/bodies.rs index 06a525091..021a9ab19 100644 --- a/crates/stages/stages/src/stages/bodies.rs +++ b/crates/stages/stages/src/stages/bodies.rs @@ -60,7 +60,7 @@ pub struct BodyStage { /// The body downloader. downloader: D, /// Block response buffer. - buffer: Option>, + buffer: Option>>, } impl BodyStage { @@ -70,9 +70,10 @@ impl BodyStage { } } -impl Stage for BodyStage +impl Stage for BodyStage where Provider: DBProvider + StaticFileProviderFactory + StatsReader + BlockReader, + D: BodyDownloader, { /// Return the id of the stage fn id(&self) -> StageId { @@ -889,6 +890,8 @@ mod tests { } impl BodyDownloader for TestBodyDownloader { + type Body = BlockBody; + fn set_download_range( &mut self, range: RangeInclusive, @@ -909,7 +912,7 @@ mod tests { } impl Stream for TestBodyDownloader { - type Item = BodyDownloaderResult; + type Item = BodyDownloaderResult; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); diff --git a/crates/stages/stages/src/stages/headers.rs b/crates/stages/stages/src/stages/headers.rs index 49e687a96..2be78b881 100644 --- a/crates/stages/stages/src/stages/headers.rs +++ b/crates/stages/stages/src/stages/headers.rs @@ -194,7 +194,7 @@ where impl Stage for HeaderStage where P: HeaderSyncGapProvider, - D: HeaderDownloader, + D: HeaderDownloader
, Provider: DBProvider + StaticFileProviderFactory, { /// Return the id of the stage @@ -441,7 +441,9 @@ mod tests { } } - impl StageTestRunner for HeadersTestRunner { + impl + 'static> StageTestRunner + for HeadersTestRunner + { type S = HeaderStage, D>; fn db(&self) -> &TestStageDB { @@ -459,7 +461,9 @@ mod tests { } } - impl ExecuteStageTestRunner for HeadersTestRunner { + impl + 'static> ExecuteStageTestRunner + for HeadersTestRunner + { type Seed = Vec; fn seed_execution(&mut self, input: ExecInput) -> Result { @@ -537,7 +541,9 @@ mod tests { } } - impl UnwindStageTestRunner for HeadersTestRunner { + impl + 'static> UnwindStageTestRunner + for HeadersTestRunner + { fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { self.check_no_header_entry_above(input.unwind_to) }