diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index b42694336..9f9f953b4 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -72,9 +72,10 @@ impl Command { BodyStage { downloader: Arc::new(bodies::concurrent::ConcurrentDownloader::new( fetch_client.clone(), + consensus.clone(), )), consensus: consensus.clone(), - batch_size: 100, + commit_threshold: 100, }, false, ) diff --git a/crates/interfaces/src/p2p/bodies/client.rs b/crates/interfaces/src/p2p/bodies/client.rs index 1a5915257..69d440eff 100644 --- a/crates/interfaces/src/p2p/bodies/client.rs +++ b/crates/interfaces/src/p2p/bodies/client.rs @@ -1,13 +1,12 @@ -use crate::p2p::error::PeerRequestResult; +use crate::p2p::{downloader::DownloadClient, error::PeerRequestResult}; use async_trait::async_trait; use reth_eth_wire::BlockBody; use reth_primitives::H256; -use std::fmt::Debug; /// A client capable of downloading block bodies. #[async_trait] #[auto_impl::auto_impl(&, Arc, Box)] -pub trait BodiesClient: Send + Sync + Debug { +pub trait BodiesClient: DownloadClient { /// Fetches the block body for the requested block. async fn get_block_body(&self, hash: Vec) -> PeerRequestResult>; } diff --git a/crates/interfaces/src/p2p/bodies/downloader.rs b/crates/interfaces/src/p2p/bodies/downloader.rs index 81716abb7..5308471e0 100644 --- a/crates/interfaces/src/p2p/bodies/downloader.rs +++ b/crates/interfaces/src/p2p/bodies/downloader.rs @@ -1,21 +1,11 @@ -use super::client::BodiesClient; -use crate::p2p::error::RequestResult; -use reth_eth_wire::BlockBody; -use reth_primitives::{BlockNumber, H256}; -use std::pin::Pin; -use tokio_stream::Stream; +use crate::p2p::downloader::{DownloadStream, Downloader}; +use reth_primitives::{BlockLocked, SealedHeader}; /// A downloader capable of fetching block bodies from header hashes. /// /// A downloader represents a distinct strategy for submitting requests to download block bodies, /// while a [BodiesClient] represents a client capable of fulfilling these requests. -pub trait BodyDownloader: Sync + Send { - /// The [BodiesClient] used to fetch the block bodies - type Client: BodiesClient; - - /// The block bodies client - fn client(&self) -> &Self::Client; - +pub trait BodyDownloader: Downloader { /// Download the bodies from `starting_block` (inclusive) up until `target_block` (inclusive). /// /// The returned stream will always emit bodies in the order they were requested, but multiple @@ -29,13 +19,9 @@ pub trait BodyDownloader: Sync + Send { /// /// It is *not* guaranteed that all the requested bodies are fetched: the downloader may close /// the stream before the entire range has been fetched for any reason - fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a> + fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockLocked> where - I: IntoIterator, + I: IntoIterator, ::IntoIter: Send + 'b, 'b: 'a; } - -/// A stream of block bodies. -pub type BodiesStream<'a> = - Pin> + Send + 'a>>; diff --git a/crates/interfaces/src/p2p/downloader.rs b/crates/interfaces/src/p2p/downloader.rs index c0ac54662..f74c0794e 100644 --- a/crates/interfaces/src/p2p/downloader.rs +++ b/crates/interfaces/src/p2p/downloader.rs @@ -1,17 +1,26 @@ -use super::headers::error::DownloadError; use crate::consensus::Consensus; use futures::Stream; -use std::pin::Pin; +use reth_primitives::PeerId; +use std::{fmt::Debug, pin::Pin}; + +use super::error::DownloadResult; /// A stream for downloading response. -pub type DownloadStream = Pin> + Send>>; +pub type DownloadStream<'a, T> = Pin> + Send + 'a>>; + +/// Generic download client for peer penalization +pub trait DownloadClient: Send + Sync + Debug { + /// Penalize the peer for responding with a message + /// that violates validation rules + fn report_bad_message(&self, peer_id: PeerId); +} /// The generic trait for requesting and verifying data /// over p2p network client #[auto_impl::auto_impl(&, Arc, Box)] pub trait Downloader: Send + Sync { /// The client used to fetch necessary data - type Client; + type Client: DownloadClient; /// The Consensus used to verify data validity when downloading type Consensus: Consensus; diff --git a/crates/interfaces/src/p2p/error.rs b/crates/interfaces/src/p2p/error.rs index 7eda1d532..c30b8d17a 100644 --- a/crates/interfaces/src/p2p/error.rs +++ b/crates/interfaces/src/p2p/error.rs @@ -1,4 +1,6 @@ -use reth_primitives::WithPeerId; +use crate::consensus; +use reth_primitives::{rpc::BlockNumber, WithPeerId, H256}; +use thiserror::Error; use tokio::sync::{mpsc, oneshot}; /// Result alias for result of a request. @@ -8,7 +10,7 @@ pub type RequestResult = Result; pub type PeerRequestResult = RequestResult>; /// Error variants that can happen when sending requests to a session. -#[derive(Debug, thiserror::Error, Clone)] +#[derive(Debug, Error, Clone)] #[allow(missing_docs)] pub enum RequestError { #[error("Closed channel to the peer.")] @@ -45,3 +47,59 @@ impl From for RequestError { RequestError::ChannelClosed } } + +/// The download result type +pub type DownloadResult = Result; + +/// The downloader error type +#[derive(Error, Debug, Clone)] +pub enum DownloadError { + /// Header validation failed + #[error("Failed to validate header {hash}. Details: {error}.")] + HeaderValidation { + /// Hash of header failing validation + hash: H256, + /// The details of validation failure + #[source] + error: consensus::Error, + }, + /// Block validation failed + #[error("Failed to validate body for header {hash}. Details: {error}.")] + BlockValidation { + /// Hash of header failing validation + hash: H256, + /// The details of validation failure + #[source] + error: consensus::Error, + }, + /// Timed out while waiting for request id response. + #[error("Timed out while getting headers for request.")] + Timeout, + /// Error when checking that the current [`Header`] has the parent's hash as the parent_hash + /// field, and that they have sequential block numbers. + #[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")] + MismatchedHeaders { + /// The header number being evaluated + header_number: BlockNumber, + /// The header hash being evaluated + header_hash: H256, + /// The parent number being evaluated + parent_number: BlockNumber, + /// The parent hash being evaluated + parent_hash: H256, + }, + /// Received empty response while expecting headers + #[error("Received empty header response.")] + EmptyResponse, + /// Received an invalid tip + #[error("Received invalid tip: {received:?}. Expected {expected:?}.")] + InvalidTip { + /// The hash of the received tip + received: H256, + /// The hash of the expected tip + expected: H256, + }, + /// Error while executing the request. + #[error(transparent)] + RequestError(#[from] RequestError), +} diff --git a/crates/interfaces/src/p2p/headers/client.rs b/crates/interfaces/src/p2p/headers/client.rs index 3f60c7708..b1d6737ed 100644 --- a/crates/interfaces/src/p2p/headers/client.rs +++ b/crates/interfaces/src/p2p/headers/client.rs @@ -1,4 +1,4 @@ -use crate::p2p::error::PeerRequestResult; +use crate::p2p::{downloader::DownloadClient, error::PeerRequestResult}; use async_trait::async_trait; pub use reth_eth_wire::BlockHeaders; use reth_primitives::{BlockHashOrNumber, HeadersDirection, H256, U256}; @@ -19,7 +19,7 @@ pub struct HeadersRequest { /// The block headers downloader client #[async_trait] #[auto_impl::auto_impl(&, Arc, Box)] -pub trait HeadersClient: Send + Sync + Debug { +pub trait HeadersClient: DownloadClient { /// Sends the header request to the p2p network and returns the header response received from a /// peer. async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult; diff --git a/crates/interfaces/src/p2p/headers/downloader.rs b/crates/interfaces/src/p2p/headers/downloader.rs index edf921f05..03244bbb3 100644 --- a/crates/interfaces/src/p2p/headers/downloader.rs +++ b/crates/interfaces/src/p2p/headers/downloader.rs @@ -2,7 +2,7 @@ use crate::{ consensus::Consensus, p2p::{ downloader::{DownloadStream, Downloader}, - headers::error::DownloadError, + error::{DownloadError, DownloadResult}, }, }; @@ -21,10 +21,10 @@ pub trait HeaderDownloader: Downloader { &self, head: SealedHeader, forkchoice: ForkchoiceState, - ) -> DownloadStream; + ) -> DownloadStream<'_, SealedHeader>; /// Validate whether the header is valid in relation to it's parent - fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> { + fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> { validate_header_download(self.consensus(), header, parent)?; Ok(()) } @@ -38,7 +38,7 @@ pub fn validate_header_download( consensus: &C, header: &SealedHeader, parent: &SealedHeader, -) -> Result<(), DownloadError> { +) -> DownloadResult<()> { ensure_parent(header, parent)?; consensus .validate_header(header, parent) @@ -47,7 +47,7 @@ pub fn validate_header_download( } /// Ensures that the given `parent` header is the actual parent of the `header` -pub fn ensure_parent(header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> { +pub fn ensure_parent(header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> { if !(parent.hash() == header.parent_hash && parent.number + 1 == header.number) { return Err(DownloadError::MismatchedHeaders { header_number: header.number.into(), diff --git a/crates/interfaces/src/p2p/headers/error.rs b/crates/interfaces/src/p2p/headers/error.rs deleted file mode 100644 index f945119eb..000000000 --- a/crates/interfaces/src/p2p/headers/error.rs +++ /dev/null @@ -1,55 +0,0 @@ -use crate::{consensus, p2p::error::RequestError}; -use reth_primitives::{rpc::BlockNumber, H256}; -use thiserror::Error; - -/// The downloader error type -#[derive(Error, Debug, Clone)] -pub enum DownloadError { - /// Header validation failed - #[error("Failed to validate header {hash}. Details: {error}.")] - HeaderValidation { - /// Hash of header failing validation - hash: H256, - /// The details of validation failure - #[source] - error: consensus::Error, - }, - /// Timed out while waiting for request id response. - #[error("Timed out while getting headers for request.")] - Timeout, - /// Error when checking that the current [`Header`] has the parent's hash as the parent_hash - /// field, and that they have sequential block numbers. - #[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")] - MismatchedHeaders { - /// The header number being evaluated - header_number: BlockNumber, - /// The header hash being evaluated - header_hash: H256, - /// The parent number being evaluated - parent_number: BlockNumber, - /// The parent hash being evaluated - parent_hash: H256, - }, - /// Received empty response while expecting headers - #[error("Received empty header response.")] - EmptyResponse, - /// Received an invalid tip - #[error("Received invalid tip: {received:?}. Expected {expected:?}.")] - InvalidTip { - /// The hash of the received tip - received: H256, - /// The hash of the expected tip - expected: H256, - }, - /// Error while executing the request. - #[error(transparent)] - RequestError(#[from] RequestError), -} - -impl DownloadError { - /// Returns bool indicating whether this error is retryable or fatal, in the cases - /// where the peer responds with no headers, or times out. - pub fn is_retryable(&self) -> bool { - matches!(self, DownloadError::Timeout { .. }) - } -} diff --git a/crates/interfaces/src/p2p/headers/mod.rs b/crates/interfaces/src/p2p/headers/mod.rs index d85e6d42a..915b28ff0 100644 --- a/crates/interfaces/src/p2p/headers/mod.rs +++ b/crates/interfaces/src/p2p/headers/mod.rs @@ -9,6 +9,3 @@ pub mod client; /// [`Consensus`]: crate::consensus::Consensus /// [`HeadersClient`]: client::HeadersClient pub mod downloader; - -/// Error types. -pub mod error; diff --git a/crates/interfaces/src/test_utils/bodies.rs b/crates/interfaces/src/test_utils/bodies.rs index c3ff8d4cd..360890454 100644 --- a/crates/interfaces/src/test_utils/bodies.rs +++ b/crates/interfaces/src/test_utils/bodies.rs @@ -1,4 +1,6 @@ -use crate::p2p::{bodies::client::BodiesClient, error::PeerRequestResult}; +use crate::p2p::{ + bodies::client::BodiesClient, downloader::DownloadClient, error::PeerRequestResult, +}; use async_trait::async_trait; use reth_eth_wire::BlockBody; use reth_primitives::H256; @@ -16,6 +18,12 @@ impl Debug for TestBodiesClient { } } +impl DownloadClient for TestBodiesClient { + fn report_bad_message(&self, _peer_id: reth_primitives::PeerId) { + // noop + } +} + #[async_trait] impl BodiesClient for TestBodiesClient where diff --git a/crates/interfaces/src/test_utils/headers.rs b/crates/interfaces/src/test_utils/headers.rs index 9ad8ddfdd..e642c4bab 100644 --- a/crates/interfaces/src/test_utils/headers.rs +++ b/crates/interfaces/src/test_utils/headers.rs @@ -2,12 +2,11 @@ use crate::{ consensus::{self, Consensus}, p2p::{ - downloader::{DownloadStream, Downloader}, - error::{PeerRequestResult, RequestError}, + downloader::{DownloadClient, DownloadStream, Downloader}, + error::{DownloadError, DownloadResult, PeerRequestResult, RequestError}, headers::{ client::{HeadersClient, HeadersRequest, StatusUpdater}, downloader::HeaderDownloader, - error::DownloadError, }, }, }; @@ -72,7 +71,7 @@ impl HeaderDownloader for TestHeaderDownloader { &self, _head: SealedHeader, _forkchoice: ForkchoiceState, - ) -> DownloadStream { + ) -> DownloadStream<'_, SealedHeader> { Box::pin(self.create_download()) } } @@ -104,7 +103,7 @@ impl TestDownload { } impl Stream for TestDownload { - type Item = Result; + type Item = DownloadResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -168,6 +167,12 @@ impl TestHeadersClient { } } +impl DownloadClient for TestHeadersClient { + fn report_bad_message(&self, _peer_id: PeerId) { + // noop + } +} + #[async_trait::async_trait] impl HeadersClient for TestHeadersClient { async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult { diff --git a/crates/net/downloaders/src/bodies/concurrent.rs b/crates/net/downloaders/src/bodies/concurrent.rs index 217c20254..f55aa0ad6 100644 --- a/crates/net/downloaders/src/bodies/concurrent.rs +++ b/crates/net/downloaders/src/bodies/concurrent.rs @@ -1,58 +1,77 @@ use backon::{ExponentialBackoff, Retryable}; use futures_util::{stream, StreamExt}; -use reth_eth_wire::BlockBody; -use reth_interfaces::p2p::{ - bodies::{ - client::BodiesClient, - downloader::{BodiesStream, BodyDownloader}, +use reth_interfaces::{ + consensus::Consensus as ConsensusTrait, + p2p::{ + bodies::{client::BodiesClient, downloader::BodyDownloader}, + downloader::{DownloadStream, Downloader}, + error::{DownloadError, DownloadResult}, }, - error::RequestResult, }; -use reth_primitives::{BlockNumber, H256}; -use std::sync::Arc; +use reth_primitives::{BlockLocked, SealedHeader}; +use std::{borrow::Borrow, sync::Arc}; /// Downloads bodies in batches. /// /// All blocks in a batch are fetched at the same time. #[derive(Debug)] -pub struct ConcurrentDownloader { +pub struct ConcurrentDownloader { /// The bodies client - client: Arc, + client: Arc, + /// The consensus client + consensus: Arc, /// The number of retries for each request. retries: usize, /// The batch size per one request batch_size: usize, } -impl BodyDownloader for ConcurrentDownloader { - type Client = C; +impl Downloader for ConcurrentDownloader +where + Client: BodiesClient, + Consensus: ConsensusTrait, +{ + type Client = Client; + type Consensus = Consensus; - /// The block bodies client fn client(&self) -> &Self::Client { - &self.client + self.client.borrow() } - fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a> + fn consensus(&self) -> &Self::Consensus { + self.consensus.borrow() + } +} + +impl BodyDownloader for ConcurrentDownloader +where + Client: BodiesClient, + Consensus: ConsensusTrait, +{ + fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockLocked> where - I: IntoIterator, + I: IntoIterator, ::IntoIter: Send + 'b, 'b: 'a, { Box::pin( - stream::iter(headers.into_iter().map(|(block_number, header_hash)| { - (|| self.fetch_body(*block_number, *header_hash)) + stream::iter(headers.into_iter().map(|header| { + (|| self.fetch_body(header)) .retry(ExponentialBackoff::default().with_max_times(self.retries)) - .when(|err| err.is_retryable()) })) .buffered(self.batch_size), ) } } -impl ConcurrentDownloader { +impl ConcurrentDownloader +where + Client: BodiesClient, + Consensus: ConsensusTrait, +{ /// Create a new concurrent downloader instance. - pub fn new(client: Arc) -> Self { - Self { client, retries: 3, batch_size: 100 } + pub fn new(client: Arc, consensus: Arc) -> Self { + Self { client, consensus, retries: 3, batch_size: 100 } } /// Set the number of blocks to fetch at the same time. @@ -72,13 +91,24 @@ impl ConcurrentDownloader { } /// Fetch a single block body. - async fn fetch_body( - &self, - block_number: BlockNumber, - header_hash: H256, - ) -> RequestResult<(BlockNumber, H256, BlockBody)> { - let mut response = self.client.get_block_body(vec![header_hash]).await?; - Ok((block_number, header_hash, response.1.remove(0))) // TODO: + async fn fetch_body(&self, header: &SealedHeader) -> DownloadResult { + let (peer_id, mut response) = + self.client.get_block_body(vec![header.hash()]).await?.split(); + + let body = response.remove(0); + let block = BlockLocked { + header: header.clone(), + body: body.transactions, + ommers: body.ommers.into_iter().map(|header| header.seal()).collect(), + }; + + match self.consensus.pre_validate_block(&block) { + Ok(_) => Ok(block), + Err(error) => { + self.client.report_bad_message(peer_id); + Err(DownloadError::BlockValidation { hash: header.hash(), error }) + } + } } } @@ -86,10 +116,14 @@ impl ConcurrentDownloader { #[cfg(test)] mod tests { use super::*; - use crate::test_utils::{generate_bodies, TestClient}; + use crate::test_utils::{generate_bodies, TestBodiesClient}; use assert_matches::assert_matches; use futures_util::stream::{StreamExt, TryStreamExt}; - use reth_interfaces::p2p::{bodies::downloader::BodyDownloader, error::RequestError}; + use reth_eth_wire::BlockBody; + use reth_interfaces::{ + p2p::{bodies::downloader::BodyDownloader, error::RequestError}, + test_utils::TestConsensus, + }; use reth_primitives::{PeerId, H256}; use std::{ sync::{ @@ -104,38 +138,46 @@ mod tests { #[tokio::test] async fn emits_bodies_in_order() { // Generate some random blocks - let (hashes, mut bodies) = generate_bodies(0..20); + let (headers, mut bodies) = generate_bodies(0..20); - let downloader = ConcurrentDownloader::new(Arc::new(TestClient::new(|hash: Vec| { - let mut bodies = bodies.clone(); - async move { - // Simulate that the request for this (random) block takes 0-100ms - tokio::time::sleep(Duration::from_millis(hash[0].to_low_u64_be() % 100)).await; + let downloader = ConcurrentDownloader::new( + Arc::new(TestBodiesClient::new(|hash: Vec| { + let mut bodies = bodies.clone(); + async move { + // Simulate that the request for this (random) block takes 0-100ms + tokio::time::sleep(Duration::from_millis(hash[0].to_low_u64_be() % 100)).await; - Ok(( - PeerId::default(), - vec![bodies - .remove(&hash[0]) - .expect("Downloader asked for a block it should not ask for")], - ) - .into()) - } - }))); + Ok(( + PeerId::default(), + vec![bodies + .remove(&hash[0]) + .expect("Downloader asked for a block it should not ask for")], + ) + .into()) + } + })), + Arc::new(TestConsensus::default()), + ); assert_matches!( downloader - .bodies_stream(hashes.iter()) - .try_collect::>() + .bodies_stream(headers.clone().iter()) + .try_collect::>() .await, Ok(responses) => { assert_eq!( responses, - hashes + headers .into_iter() - .map(|(num, hash)| { - (num, hash, bodies.remove(&hash).unwrap()) + .map(| header | { + let body = bodies .remove(&header.hash()).unwrap(); + BlockLocked { + header, + body: body.transactions, + ommers: body.ommers.into_iter().map(|o| o.seal()).collect(), + } }) - .collect::>() + .collect::>() ); } ); @@ -144,14 +186,16 @@ mod tests { /// Checks that non-retryable errors bubble up #[tokio::test] async fn client_failure() { - let downloader = - ConcurrentDownloader::new(Arc::new(TestClient::new(|_: Vec| async { + let downloader = ConcurrentDownloader::new( + Arc::new(TestBodiesClient::new(|_: Vec| async { Err(RequestError::ChannelClosed) - }))); + })), + Arc::new(TestConsensus::default()), + ); assert_matches!( - downloader.bodies_stream(&[(0, H256::zero())]).next().await, - Some(Err(RequestError::ChannelClosed)) + downloader.bodies_stream(&[SealedHeader::default()]).next().await, + Some(Err(DownloadError::RequestError(RequestError::ChannelClosed))) ); } @@ -159,8 +203,8 @@ mod tests { #[tokio::test] async fn retries_timeouts() { let retries_left = Arc::new(AtomicUsize::new(3)); - let downloader = - ConcurrentDownloader::new(Arc::new(TestClient::new(|mut header_hash: Vec| { + let downloader = ConcurrentDownloader::new( + Arc::new(TestBodiesClient::new(|mut header_hash: Vec| { let retries_left = retries_left.clone(); let _header_hash = header_hash.remove(0); async move { @@ -175,17 +219,14 @@ mod tests { .into()) } } - }))); + })), + Arc::new(TestConsensus::default()), + ); assert_matches!( - downloader.bodies_stream(&[(0, H256::zero())]).next().await, + downloader.bodies_stream(&[SealedHeader::default()]).next().await, Some(Ok(body)) => { - assert_eq!(body.0, 0); - assert_eq!(body.1, H256::zero()); - assert_eq!(body.2, BlockBody { - transactions: vec![], - ommers: vec![] - }) + assert_eq!(body, BlockLocked::default()); } ); assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 0); @@ -195,8 +236,8 @@ mod tests { #[tokio::test] async fn too_many_retries() { let retries_left = Arc::new(AtomicUsize::new(3)); - let downloader = - ConcurrentDownloader::new(Arc::new(TestClient::new(|mut header_hash: Vec| { + let downloader = ConcurrentDownloader::new( + Arc::new(TestBodiesClient::new(|mut header_hash: Vec| { let _header_hash = header_hash.remove(0); let retries_left = retries_left.clone(); async move { @@ -211,12 +252,14 @@ mod tests { .into()) } } - }))) - .with_retries(0); + })), + Arc::new(TestConsensus::default()), + ) + .with_retries(0); assert_matches!( - downloader.bodies_stream(&[(0, H256::zero())]).next().await, - Some(Err(RequestError::Timeout)) + downloader.bodies_stream(&[SealedHeader::default()]).next().await, + Some(Err(DownloadError::RequestError(RequestError::Timeout))) ); assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 2); } diff --git a/crates/net/downloaders/src/headers/linear.rs b/crates/net/downloaders/src/headers/linear.rs index 54c3c067e..f23bc214e 100644 --- a/crates/net/downloaders/src/headers/linear.rs +++ b/crates/net/downloaders/src/headers/linear.rs @@ -3,11 +3,10 @@ use reth_interfaces::{ consensus::Consensus, p2p::{ downloader::{DownloadStream, Downloader}, - error::PeerRequestResult, + error::{DownloadError, DownloadResult, PeerRequestResult}, headers::{ client::{BlockHeaders, HeadersClient, HeadersRequest}, downloader::{validate_header_download, HeaderDownloader}, - error::DownloadError, }, }, }; @@ -62,7 +61,7 @@ where &self, head: SealedHeader, forkchoice: ForkchoiceState, - ) -> DownloadStream { + ) -> DownloadStream<'_, SealedHeader> { Box::pin(self.new_download(head, forkchoice)) } } @@ -208,6 +207,10 @@ where // queue in the first request let client = Arc::clone(&self.client); let req = self.headers_request(); + tracing::trace!( + target: "downloaders::headers", + "requesting headers {req:?}" + ); HeadersRequestFuture { request: req.clone(), fut: Box::pin(async move { client.get_headers(req).await }), @@ -243,7 +246,7 @@ where /// /// Returns Ok(false) if the #[allow(clippy::result_large_err)] - fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> { + fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> { validate_header_download(&self.consensus, header, parent)?; Ok(()) } @@ -252,7 +255,7 @@ where fn process_header_response( &mut self, response: PeerRequestResult, - ) -> Result<(), DownloadError> { + ) -> DownloadResult<()> { match response { Ok(res) => { let mut headers = res.1 .0; @@ -290,6 +293,8 @@ where } Ok(()) } + // most likely a noop, because this error + // would've been handled by the fetcher internally Err(err) => Err(err.into()), } } @@ -300,7 +305,7 @@ where C: Consensus + 'static, H: HeadersClient + 'static, { - type Item = Result; + type Item = DownloadResult; /// Linear header downloader implemented as a [Stream]. The downloader sends header /// requests until the head is reached and buffers the responses. If the request future @@ -330,6 +335,7 @@ where let mut fut = this.get_or_init_fut(); match fut.poll_unpin(cx) { Poll::Ready(result) => { + let peer_id = result.as_ref().map(|res| res.peer_id()).ok(); // Process the response, buffering the headers // in case of successful validation match this.process_header_response(result) { @@ -342,6 +348,14 @@ where } } Err(err) => { + // Penalize the peer for bad response + if let Some(peer_id) = peer_id { + tracing::trace!( + target: "downloaders::headers", + "penalizing peer {peer_id} for {err:?}" + ); + this.client.report_bad_message(peer_id); + } // Response is invalid, attempt to retry if this.try_fuse_request_fut(&mut fut).is_err() { tracing::trace!( diff --git a/crates/net/downloaders/src/test_utils.rs b/crates/net/downloaders/src/test_utils.rs index 4bc383b9b..ecf2af5cd 100644 --- a/crates/net/downloaders/src/test_utils.rs +++ b/crates/net/downloaders/src/test_utils.rs @@ -3,10 +3,10 @@ use async_trait::async_trait; use reth_eth_wire::BlockBody; use reth_interfaces::{ - p2p::{bodies::client::BodiesClient, error::PeerRequestResult}, + p2p::{bodies::client::BodiesClient, downloader::DownloadClient, error::PeerRequestResult}, test_utils::generators::random_block_range, }; -use reth_primitives::{BlockNumber, H256}; +use reth_primitives::{PeerId, SealedHeader, H256}; use std::{ collections::HashMap, fmt::{Debug, Formatter}, @@ -18,12 +18,11 @@ use tokio::sync::Mutex; /// Generate a set of bodies and their corresponding block hashes pub(crate) fn generate_bodies( rng: std::ops::Range, -) -> (Vec<(BlockNumber, H256)>, HashMap) { +) -> (Vec, HashMap) { let blocks = random_block_range(rng, H256::zero()); - let hashes: Vec<(BlockNumber, H256)> = - blocks.iter().map(|block| (block.number, block.hash())).collect(); - let bodies: HashMap = blocks + let headers = blocks.iter().map(|block| block.header.clone()).collect(); + let bodies = blocks .into_iter() .map(|block| { ( @@ -36,26 +35,32 @@ pub(crate) fn generate_bodies( }) .collect(); - (hashes, bodies) + (headers, bodies) } /// A [BodiesClient] for testing. -pub(crate) struct TestClient(pub(crate) Arc>); +pub(crate) struct TestBodiesClient(pub(crate) Arc>); -impl Debug for TestClient { +impl Debug for TestBodiesClient { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TestClient").finish_non_exhaustive() + f.debug_struct("TestBodiesClient").finish_non_exhaustive() } } -impl TestClient { +impl TestBodiesClient { pub(crate) fn new(f: F) -> Self { Self(Arc::new(Mutex::new(f))) } } +impl DownloadClient for TestBodiesClient { + fn report_bad_message(&self, _peer_id: PeerId) { + // noop + } +} + #[async_trait] -impl BodiesClient for TestClient +impl BodiesClient for TestBodiesClient where F: FnMut(Vec) -> Fut + Send + Sync, Fut: Future>> + Send, diff --git a/crates/net/network/src/fetch/client.rs b/crates/net/network/src/fetch/client.rs index 3977bebac..0e3c57daa 100644 --- a/crates/net/network/src/fetch/client.rs +++ b/crates/net/network/src/fetch/client.rs @@ -1,13 +1,17 @@ //! A client implementation that can interact with the network and download data. -use crate::fetch::DownloadRequest; +use crate::{ + fetch::DownloadRequest, + peers::{PeersHandle, ReputationChangeKind}, +}; use reth_eth_wire::{BlockBody, BlockHeaders}; use reth_interfaces::p2p::{ bodies::client::BodiesClient, + downloader::DownloadClient, error::PeerRequestResult, headers::client::{HeadersClient, HeadersRequest}, }; -use reth_primitives::{WithPeerId, H256}; +use reth_primitives::{PeerId, WithPeerId, H256}; use tokio::sync::{mpsc::UnboundedSender, oneshot}; /// Front-end API for fetching data from the network. @@ -15,6 +19,14 @@ use tokio::sync::{mpsc::UnboundedSender, oneshot}; pub struct FetchClient { /// Sender half of the request channel. pub(crate) request_tx: UnboundedSender, + /// The handle to the peers + pub(crate) peers_handle: PeersHandle, +} + +impl DownloadClient for FetchClient { + fn report_bad_message(&self, peer_id: PeerId) { + self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage); + } } #[async_trait::async_trait] diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 21aa312e8..7b28a7cea 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -1,6 +1,6 @@ //! Fetch data from the network. -use crate::message::BlockRequest; +use crate::{message::BlockRequest, peers::PeersHandle}; use futures::StreamExt; use reth_eth_wire::{BlockBody, GetBlockBodies, GetBlockHeaders}; use reth_interfaces::p2p::{ @@ -32,6 +32,8 @@ pub struct StateFetcher { HashMap, PeerRequestResult>>>, /// The list of available peers for requests. peers: HashMap, + /// The handle to the peers manager + peers_handle: PeersHandle, /// Requests queued for processing queued_requests: VecDeque, /// Receiver for new incoming download requests @@ -43,6 +45,19 @@ pub struct StateFetcher { // === impl StateSyncer === impl StateFetcher { + pub(crate) fn new(peers_handle: PeersHandle) -> Self { + let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel(); + Self { + inflight_headers_requests: Default::default(), + inflight_bodies_requests: Default::default(), + peers: Default::default(), + peers_handle, + queued_requests: Default::default(), + download_requests_rx: UnboundedReceiverStream::new(download_requests_rx), + download_requests_tx, + } + } + /// Invoked when connected to a new peer. pub(crate) fn new_active_peer(&mut self, peer_id: PeerId, best_hash: H256, best_number: u64) { self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number }); @@ -221,20 +236,9 @@ impl StateFetcher { /// Returns a new [`FetchClient`] that can send requests to this type. pub(crate) fn client(&self) -> FetchClient { - FetchClient { request_tx: self.download_requests_tx.clone() } - } -} - -impl Default for StateFetcher { - fn default() -> Self { - let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel(); - Self { - inflight_headers_requests: Default::default(), - inflight_bodies_requests: Default::default(), - peers: Default::default(), - queued_requests: Default::default(), - download_requests_rx: UnboundedReceiverStream::new(download_requests_rx), - download_requests_tx, + FetchClient { + request_tx: self.download_requests_tx.clone(), + peers_handle: self.peers_handle.clone(), } } } @@ -350,12 +354,15 @@ pub(crate) enum BlockResponseOutcome { #[cfg(test)] mod tests { + use crate::{peers::PeersManager, PeersConfig}; + use super::*; use std::future::poll_fn; #[tokio::test(flavor = "multi_thread")] async fn test_poll_fetcher() { - let mut fetcher = StateFetcher::default(); + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = StateFetcher::new(manager.handle()); poll_fn(move |cx| { assert!(fetcher.poll(cx).is_pending()); diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 7b41b464d..a0bd87819 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -70,6 +70,7 @@ where peers_manager: PeersManager, genesis_hash: H256, ) -> Self { + let state_fetcher = StateFetcher::new(peers_manager.handle()); Self { active_peers: Default::default(), peers_manager, @@ -77,7 +78,7 @@ where client, discovery, genesis_hash, - state_fetcher: Default::default(), + state_fetcher, } } diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index ba0d94b57..47952c0bd 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -2,7 +2,7 @@ use crate::{ db::StageDB, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, }; -use futures_util::TryStreamExt; +use futures_util::StreamExt; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::{Database, DatabaseGAT}, @@ -13,10 +13,10 @@ use reth_db::{ use reth_interfaces::{consensus::Consensus, p2p::bodies::downloader::BodyDownloader}; use reth_primitives::{ proofs::{EMPTY_LIST_HASH, EMPTY_ROOT}, - BlockLocked, BlockNumber, SealedHeader, H256, + BlockNumber, SealedHeader, }; use std::{fmt::Debug, sync::Arc}; -use tracing::warn; +use tracing::{error, warn}; const BODIES: StageId = StageId("Bodies"); @@ -61,7 +61,7 @@ pub struct BodyStage { /// /// Smaller batch sizes result in less memory usage, but more disk I/O. Larger batch sizes /// result in more memory usage, less disk I/O, and more infrequent checkpoints. - pub batch_size: u64, + pub commit_threshold: u64, } #[async_trait::async_trait] @@ -91,7 +91,7 @@ impl Stage for BodyStage Stage for BodyStage()?; - // NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator // on every iteration of the while loop -_- let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter()); let mut highest_block = previous_block; - while let Some((block_number, header_hash, body)) = - bodies_stream.try_next().await.map_err(|err| StageError::Internal(err.into()))? - { - // Fetch the block header for pre-validation - let block = BlockLocked { - header: SealedHeader::new( - header_cursor - .seek_exact((block_number, header_hash).into())? - .ok_or(DatabaseIntegrityError::Header { - number: block_number, - hash: header_hash, - })? - .1, - header_hash, - ), - body: body.transactions, - ommers: body.ommers.into_iter().map(|header| header.seal()).collect(), + while let Some(result) = bodies_stream.next().await { + let block = match result { + Ok(block) => block, + Err(err) => { + error!( + "Encountered error downloading block {}. Details: {:?}", + highest_block + 1, + err + ); + // Exit the stage early + return Ok(ExecOutput { + stage_progress: highest_block, + done: false, + reached_tip: false, + }) + } }; - // Pre-validate the block and unwind if it is invalid - self.consensus - .pre_validate_block(&block) - .map_err(|err| StageError::Validation { block: block_number, error: err })?; - + let block_number = block.number; // Write block - let key = (block_number, header_hash).into(); + let key = (block_number, block.hash()).into(); // Additional +1, increments tx count to allow indexing of ChangeSet that contains block // reward. This can't be added to last transaction ChangeSet as it would // break if block is empty. let this_tx_count = first_tx_id + block.body.len() as u64 + - if self.consensus.has_block_reward(block_number) { 1 } else { 0 }; + if self.consensus.has_block_reward(block.number) { 1 } else { 0 }; tx_count_cursor.append(key, this_tx_count)?; ommers_cursor.append( key, @@ -229,7 +221,7 @@ impl BodyStage { tx: &mut >::TXMut, starting_block: BlockNumber, target: BlockNumber, - ) -> Result, StageError> { + ) -> Result, StageError> { let mut header_cursor = tx.cursor::()?; let mut header_hashes_cursor = tx.cursor::()?; let mut walker = header_hashes_cursor @@ -238,15 +230,18 @@ impl BodyStage { let mut bodies_to_download = Vec::new(); while let Some(Ok((block_number, header_hash))) = walker.next() { - let header = header_cursor - .seek_exact((block_number, header_hash).into())? - .ok_or(DatabaseIntegrityError::Header { number: block_number, hash: header_hash })? - .1; + let (_, header) = header_cursor.seek_exact((block_number, header_hash).into())?.ok_or( + DatabaseIntegrityError::Header { number: block_number, hash: header_hash }, + )?; + if header.ommers_hash == EMPTY_LIST_HASH && header.transactions_root == EMPTY_ROOT { + // TODO: fix this + // If we indeed move to the new changeset structure let's not forget to add a note + // that the gaps issue with the returned empty bodies stream is no longer present continue } - bodies_to_download.push((block_number, header_hash)); + bodies_to_download.push(SealedHeader::new(header, header_hash)); } Ok(bodies_to_download) @@ -261,7 +256,10 @@ mod tests { PREV_STAGE_ID, }; use assert_matches::assert_matches; - use reth_interfaces::{consensus, p2p::error::RequestError}; + use reth_interfaces::{ + consensus, + p2p::error::{DownloadError, RequestError}, + }; use std::collections::HashMap; use test_utils::*; @@ -368,21 +366,34 @@ mod tests { assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation"); } - /// Checks that the stage asks to unwind if pre-validation of the block fails. + /// Checks that the stage returns to the pipeline on validation failure. #[tokio::test] async fn pre_validation_failure() { let (stage_progress, previous_stage) = (1, 20); // Set up test runner let mut runner = BodyTestRunner::default(); + let input = ExecInput { previous_stage: Some((PREV_STAGE_ID, previous_stage)), stage_progress: Some(stage_progress), }; - runner.seed_execution(input).expect("failed to seed execution"); + let blocks = runner.seed_execution(input).expect("failed to seed execution"); // Fail validation - runner.consensus.set_fail_validation(true); + let responses = blocks + .iter() + .map(|b| { + ( + b.hash(), + Err(DownloadError::BlockValidation { + hash: b.hash(), + error: consensus::Error::BaseFeeMissing, + }), + ) + }) + .collect::>(); + runner.set_responses(responses); // Run the stage let rx = runner.execute(input); @@ -390,7 +401,8 @@ mod tests { // Check that the error bubbles up assert_matches!( rx.await.unwrap(), - Err(StageError::Validation { error: consensus::Error::BaseFeeMissing, .. }) + Ok(ExecOutput { stage_progress: out_stage_progress, done: false, reached_tip: false }) + if out_stage_progress == stage_progress ); assert!(runner.validate_execution(input, None).is_ok(), "execution validation"); } @@ -467,13 +479,20 @@ mod tests { // overwrite responses let header = blocks.last().unwrap(); - runner.set_responses(HashMap::from([(header.hash(), Err(RequestError::Timeout))])); + runner.set_responses(HashMap::from([( + header.hash(), + Err(DownloadError::RequestError(RequestError::Timeout)), + )])); // Run the stage let rx = runner.execute(input); // Check that the error bubbles up - assert_matches!(rx.await.unwrap(), Err(StageError::Internal(_))); + assert_matches!( + rx.await.unwrap(), + Ok(ExecOutput { stage_progress: out_stage_progress, done: false, reached_tip: false }) + if out_stage_progress == stage_progress + ); assert!(runner.validate_execution(input, None).is_ok(), "execution validation"); } @@ -496,11 +515,9 @@ mod tests { use reth_eth_wire::BlockBody; use reth_interfaces::{ p2p::{ - bodies::{ - client::BodiesClient, - downloader::{BodiesStream, BodyDownloader}, - }, - error::{PeerRequestResult, RequestResult}, + bodies::{client::BodiesClient, downloader::BodyDownloader}, + downloader::{DownloadClient, DownloadStream, Downloader}, + error::{DownloadResult, PeerRequestResult}, }, test_utils::{ generators::{random_block_range, random_signed_tx}, @@ -514,7 +531,7 @@ mod tests { pub(crate) const GENESIS_HASH: H256 = H256::zero(); /// A helper to create a collection of resulted-wrapped block bodies keyed by their hash. - pub(crate) fn body_by_hash(block: &BlockLocked) -> (H256, RequestResult) { + pub(crate) fn body_by_hash(block: &BlockLocked) -> (H256, DownloadResult) { ( block.hash(), Ok(BlockBody { @@ -527,7 +544,7 @@ mod tests { /// A helper struct for running the [BodyStage]. pub(crate) struct BodyTestRunner { pub(crate) consensus: Arc, - responses: HashMap>, + responses: HashMap>, db: TestStageDB, batch_size: u64, } @@ -550,7 +567,7 @@ mod tests { pub(crate) fn set_responses( &mut self, - responses: HashMap>, + responses: HashMap>, ) { self.responses = responses; } @@ -567,7 +584,7 @@ mod tests { BodyStage { downloader: Arc::new(TestBodyDownloader::new(self.responses.clone())), consensus: self.consensus.clone(), - batch_size: self.batch_size, + commit_threshold: self.batch_size, } } } @@ -733,6 +750,12 @@ mod tests { #[derive(Debug)] pub(crate) struct NoopClient; + impl DownloadClient for NoopClient { + fn report_bad_message(&self, _: reth_primitives::PeerId) { + panic!("Noop client should not be called") + } + } + #[async_trait::async_trait] impl BodiesClient for NoopClient { async fn get_block_body(&self, _: Vec) -> PeerRequestResult> { @@ -744,38 +767,47 @@ mod tests { /// A [BodyDownloader] that is backed by an internal [HashMap] for testing. #[derive(Debug, Default, Clone)] pub(crate) struct TestBodyDownloader { - responses: HashMap>, + responses: HashMap>, } impl TestBodyDownloader { - pub(crate) fn new(responses: HashMap>) -> Self { + pub(crate) fn new(responses: HashMap>) -> Self { Self { responses } } } - impl BodyDownloader for TestBodyDownloader { + impl Downloader for TestBodyDownloader { type Client = NoopClient; + type Consensus = TestConsensus; fn client(&self) -> &Self::Client { unreachable!() } - fn bodies_stream<'a, 'b, I>(&'a self, hashes: I) -> BodiesStream<'a> + fn consensus(&self) -> &Self::Consensus { + unreachable!() + } + } + + impl BodyDownloader for TestBodyDownloader { + fn bodies_stream<'a, 'b, I>(&'a self, hashes: I) -> DownloadStream<'a, BlockLocked> where - I: IntoIterator, + I: IntoIterator, ::IntoIter: Send + 'b, 'b: 'a, { - Box::pin(futures_util::stream::iter(hashes.into_iter().map( - |(block_number, hash)| { - let result = self - .responses - .get(hash) - .expect("Stage tried downloading a block we do not have.") - .clone()?; - Ok((*block_number, *hash, result)) - }, - ))) + Box::pin(futures_util::stream::iter(hashes.into_iter().map(|header| { + let result = self + .responses + .get(&header.hash()) + .expect("Stage tried downloading a block we do not have.") + .clone()?; + Ok(BlockLocked { + header: header.clone(), + body: result.transactions, + ommers: result.ommers.into_iter().map(|header| header.seal()).collect(), + }) + }))) } } } diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 7e59fd82a..7301ff40a 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -12,10 +12,12 @@ use reth_db::{ }; use reth_interfaces::{ consensus::{Consensus, ForkchoiceState}, - p2p::headers::{ - client::{HeadersClient, StatusUpdater}, - downloader::{ensure_parent, HeaderDownloader}, + p2p::{ error::DownloadError, + headers::{ + client::{HeadersClient, StatusUpdater}, + downloader::{ensure_parent, HeaderDownloader}, + }, }, }; use reth_primitives::{BlockNumber, SealedHeader, H256, U256};