feat(sync): download peer penalization (#427)

* feat(sync): download peer penalization

* peer penalization

* add tracing on penalization

* add trace on request

* rename consensus back

* clippy

* fix tests

* nit: download result

* nit: fix comment

* rename penalize() to report_bad_message() and move DownloadError

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Roman Krasiuk
2022-12-15 10:42:18 +02:00
committed by GitHub
parent f2707d32b5
commit 22dc50e5f6
19 changed files with 406 additions and 282 deletions

View File

@ -72,9 +72,10 @@ impl Command {
BodyStage {
downloader: Arc::new(bodies::concurrent::ConcurrentDownloader::new(
fetch_client.clone(),
consensus.clone(),
)),
consensus: consensus.clone(),
batch_size: 100,
commit_threshold: 100,
},
false,
)

View File

@ -1,13 +1,12 @@
use crate::p2p::error::PeerRequestResult;
use crate::p2p::{downloader::DownloadClient, error::PeerRequestResult};
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_primitives::H256;
use std::fmt::Debug;
/// A client capable of downloading block bodies.
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BodiesClient: Send + Sync + Debug {
pub trait BodiesClient: DownloadClient {
/// Fetches the block body for the requested block.
async fn get_block_body(&self, hash: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>>;
}

View File

@ -1,21 +1,11 @@
use super::client::BodiesClient;
use crate::p2p::error::RequestResult;
use reth_eth_wire::BlockBody;
use reth_primitives::{BlockNumber, H256};
use std::pin::Pin;
use tokio_stream::Stream;
use crate::p2p::downloader::{DownloadStream, Downloader};
use reth_primitives::{BlockLocked, SealedHeader};
/// A downloader capable of fetching block bodies from header hashes.
///
/// A downloader represents a distinct strategy for submitting requests to download block bodies,
/// while a [BodiesClient] represents a client capable of fulfilling these requests.
pub trait BodyDownloader: Sync + Send {
/// The [BodiesClient] used to fetch the block bodies
type Client: BodiesClient;
/// The block bodies client
fn client(&self) -> &Self::Client;
pub trait BodyDownloader: Downloader {
/// Download the bodies from `starting_block` (inclusive) up until `target_block` (inclusive).
///
/// The returned stream will always emit bodies in the order they were requested, but multiple
@ -29,13 +19,9 @@ pub trait BodyDownloader: Sync + Send {
///
/// It is *not* guaranteed that all the requested bodies are fetched: the downloader may close
/// the stream before the entire range has been fetched for any reason
fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a>
fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockLocked>
where
I: IntoIterator<Item = &'b (BlockNumber, H256)>,
I: IntoIterator<Item = &'b SealedHeader>,
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a;
}
/// A stream of block bodies.
pub type BodiesStream<'a> =
Pin<Box<dyn Stream<Item = RequestResult<(BlockNumber, H256, BlockBody)>> + Send + 'a>>;

View File

@ -1,17 +1,26 @@
use super::headers::error::DownloadError;
use crate::consensus::Consensus;
use futures::Stream;
use std::pin::Pin;
use reth_primitives::PeerId;
use std::{fmt::Debug, pin::Pin};
use super::error::DownloadResult;
/// A stream for downloading response.
pub type DownloadStream<T> = Pin<Box<dyn Stream<Item = Result<T, DownloadError>> + Send>>;
pub type DownloadStream<'a, T> = Pin<Box<dyn Stream<Item = DownloadResult<T>> + Send + 'a>>;
/// Generic download client for peer penalization
pub trait DownloadClient: Send + Sync + Debug {
/// Penalize the peer for responding with a message
/// that violates validation rules
fn report_bad_message(&self, peer_id: PeerId);
}
/// The generic trait for requesting and verifying data
/// over p2p network client
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait Downloader: Send + Sync {
/// The client used to fetch necessary data
type Client;
type Client: DownloadClient;
/// The Consensus used to verify data validity when downloading
type Consensus: Consensus;

View File

@ -1,4 +1,6 @@
use reth_primitives::WithPeerId;
use crate::consensus;
use reth_primitives::{rpc::BlockNumber, WithPeerId, H256};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
/// Result alias for result of a request.
@ -8,7 +10,7 @@ pub type RequestResult<T> = Result<T, RequestError>;
pub type PeerRequestResult<T> = RequestResult<WithPeerId<T>>;
/// Error variants that can happen when sending requests to a session.
#[derive(Debug, thiserror::Error, Clone)]
#[derive(Debug, Error, Clone)]
#[allow(missing_docs)]
pub enum RequestError {
#[error("Closed channel to the peer.")]
@ -45,3 +47,59 @@ impl From<oneshot::error::RecvError> for RequestError {
RequestError::ChannelClosed
}
}
/// The download result type
pub type DownloadResult<T> = Result<T, DownloadError>;
/// The downloader error type
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Header validation failed
#[error("Failed to validate header {hash}. Details: {error}.")]
HeaderValidation {
/// Hash of header failing validation
hash: H256,
/// The details of validation failure
#[source]
error: consensus::Error,
},
/// Block validation failed
#[error("Failed to validate body for header {hash}. Details: {error}.")]
BlockValidation {
/// Hash of header failing validation
hash: H256,
/// The details of validation failure
#[source]
error: consensus::Error,
},
/// Timed out while waiting for request id response.
#[error("Timed out while getting headers for request.")]
Timeout,
/// Error when checking that the current [`Header`] has the parent's hash as the parent_hash
/// field, and that they have sequential block numbers.
#[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")]
MismatchedHeaders {
/// The header number being evaluated
header_number: BlockNumber,
/// The header hash being evaluated
header_hash: H256,
/// The parent number being evaluated
parent_number: BlockNumber,
/// The parent hash being evaluated
parent_hash: H256,
},
/// Received empty response while expecting headers
#[error("Received empty header response.")]
EmptyResponse,
/// Received an invalid tip
#[error("Received invalid tip: {received:?}. Expected {expected:?}.")]
InvalidTip {
/// The hash of the received tip
received: H256,
/// The hash of the expected tip
expected: H256,
},
/// Error while executing the request.
#[error(transparent)]
RequestError(#[from] RequestError),
}

View File

@ -1,4 +1,4 @@
use crate::p2p::error::PeerRequestResult;
use crate::p2p::{downloader::DownloadClient, error::PeerRequestResult};
use async_trait::async_trait;
pub use reth_eth_wire::BlockHeaders;
use reth_primitives::{BlockHashOrNumber, HeadersDirection, H256, U256};
@ -19,7 +19,7 @@ pub struct HeadersRequest {
/// The block headers downloader client
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeadersClient: Send + Sync + Debug {
pub trait HeadersClient: DownloadClient {
/// Sends the header request to the p2p network and returns the header response received from a
/// peer.
async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult<BlockHeaders>;

View File

@ -2,7 +2,7 @@ use crate::{
consensus::Consensus,
p2p::{
downloader::{DownloadStream, Downloader},
headers::error::DownloadError,
error::{DownloadError, DownloadResult},
},
};
@ -21,10 +21,10 @@ pub trait HeaderDownloader: Downloader {
&self,
head: SealedHeader,
forkchoice: ForkchoiceState,
) -> DownloadStream<SealedHeader>;
) -> DownloadStream<'_, SealedHeader>;
/// Validate whether the header is valid in relation to it's parent
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> {
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
validate_header_download(self.consensus(), header, parent)?;
Ok(())
}
@ -38,7 +38,7 @@ pub fn validate_header_download<C: Consensus>(
consensus: &C,
header: &SealedHeader,
parent: &SealedHeader,
) -> Result<(), DownloadError> {
) -> DownloadResult<()> {
ensure_parent(header, parent)?;
consensus
.validate_header(header, parent)
@ -47,7 +47,7 @@ pub fn validate_header_download<C: Consensus>(
}
/// Ensures that the given `parent` header is the actual parent of the `header`
pub fn ensure_parent(header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> {
pub fn ensure_parent(header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
if !(parent.hash() == header.parent_hash && parent.number + 1 == header.number) {
return Err(DownloadError::MismatchedHeaders {
header_number: header.number.into(),

View File

@ -1,55 +0,0 @@
use crate::{consensus, p2p::error::RequestError};
use reth_primitives::{rpc::BlockNumber, H256};
use thiserror::Error;
/// The downloader error type
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Header validation failed
#[error("Failed to validate header {hash}. Details: {error}.")]
HeaderValidation {
/// Hash of header failing validation
hash: H256,
/// The details of validation failure
#[source]
error: consensus::Error,
},
/// Timed out while waiting for request id response.
#[error("Timed out while getting headers for request.")]
Timeout,
/// Error when checking that the current [`Header`] has the parent's hash as the parent_hash
/// field, and that they have sequential block numbers.
#[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")]
MismatchedHeaders {
/// The header number being evaluated
header_number: BlockNumber,
/// The header hash being evaluated
header_hash: H256,
/// The parent number being evaluated
parent_number: BlockNumber,
/// The parent hash being evaluated
parent_hash: H256,
},
/// Received empty response while expecting headers
#[error("Received empty header response.")]
EmptyResponse,
/// Received an invalid tip
#[error("Received invalid tip: {received:?}. Expected {expected:?}.")]
InvalidTip {
/// The hash of the received tip
received: H256,
/// The hash of the expected tip
expected: H256,
},
/// Error while executing the request.
#[error(transparent)]
RequestError(#[from] RequestError),
}
impl DownloadError {
/// Returns bool indicating whether this error is retryable or fatal, in the cases
/// where the peer responds with no headers, or times out.
pub fn is_retryable(&self) -> bool {
matches!(self, DownloadError::Timeout { .. })
}
}

View File

@ -9,6 +9,3 @@ pub mod client;
/// [`Consensus`]: crate::consensus::Consensus
/// [`HeadersClient`]: client::HeadersClient
pub mod downloader;
/// Error types.
pub mod error;

View File

@ -1,4 +1,6 @@
use crate::p2p::{bodies::client::BodiesClient, error::PeerRequestResult};
use crate::p2p::{
bodies::client::BodiesClient, downloader::DownloadClient, error::PeerRequestResult,
};
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_primitives::H256;
@ -16,6 +18,12 @@ impl<F> Debug for TestBodiesClient<F> {
}
}
impl<F: Sync + Send> DownloadClient for TestBodiesClient<F> {
fn report_bad_message(&self, _peer_id: reth_primitives::PeerId) {
// noop
}
}
#[async_trait]
impl<F> BodiesClient for TestBodiesClient<F>
where

View File

@ -2,12 +2,11 @@
use crate::{
consensus::{self, Consensus},
p2p::{
downloader::{DownloadStream, Downloader},
error::{PeerRequestResult, RequestError},
downloader::{DownloadClient, DownloadStream, Downloader},
error::{DownloadError, DownloadResult, PeerRequestResult, RequestError},
headers::{
client::{HeadersClient, HeadersRequest, StatusUpdater},
downloader::HeaderDownloader,
error::DownloadError,
},
},
};
@ -72,7 +71,7 @@ impl HeaderDownloader for TestHeaderDownloader {
&self,
_head: SealedHeader,
_forkchoice: ForkchoiceState,
) -> DownloadStream<SealedHeader> {
) -> DownloadStream<'_, SealedHeader> {
Box::pin(self.create_download())
}
}
@ -104,7 +103,7 @@ impl TestDownload {
}
impl Stream for TestDownload {
type Item = Result<SealedHeader, DownloadError>;
type Item = DownloadResult<SealedHeader>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
@ -168,6 +167,12 @@ impl TestHeadersClient {
}
}
impl DownloadClient for TestHeadersClient {
fn report_bad_message(&self, _peer_id: PeerId) {
// noop
}
}
#[async_trait::async_trait]
impl HeadersClient for TestHeadersClient {
async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult<BlockHeaders> {

View File

@ -1,58 +1,77 @@
use backon::{ExponentialBackoff, Retryable};
use futures_util::{stream, StreamExt};
use reth_eth_wire::BlockBody;
use reth_interfaces::p2p::{
bodies::{
client::BodiesClient,
downloader::{BodiesStream, BodyDownloader},
use reth_interfaces::{
consensus::Consensus as ConsensusTrait,
p2p::{
bodies::{client::BodiesClient, downloader::BodyDownloader},
downloader::{DownloadStream, Downloader},
error::{DownloadError, DownloadResult},
},
error::RequestResult,
};
use reth_primitives::{BlockNumber, H256};
use std::sync::Arc;
use reth_primitives::{BlockLocked, SealedHeader};
use std::{borrow::Borrow, sync::Arc};
/// Downloads bodies in batches.
///
/// All blocks in a batch are fetched at the same time.
#[derive(Debug)]
pub struct ConcurrentDownloader<C> {
pub struct ConcurrentDownloader<Client, Consensus> {
/// The bodies client
client: Arc<C>,
client: Arc<Client>,
/// The consensus client
consensus: Arc<Consensus>,
/// The number of retries for each request.
retries: usize,
/// The batch size per one request
batch_size: usize,
}
impl<C: BodiesClient> BodyDownloader for ConcurrentDownloader<C> {
type Client = C;
impl<Client, Consensus> Downloader for ConcurrentDownloader<Client, Consensus>
where
Client: BodiesClient,
Consensus: ConsensusTrait,
{
type Client = Client;
type Consensus = Consensus;
/// The block bodies client
fn client(&self) -> &Self::Client {
&self.client
self.client.borrow()
}
fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a>
fn consensus(&self) -> &Self::Consensus {
self.consensus.borrow()
}
}
impl<Client, Consensus> BodyDownloader for ConcurrentDownloader<Client, Consensus>
where
Client: BodiesClient,
Consensus: ConsensusTrait,
{
fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockLocked>
where
I: IntoIterator<Item = &'b (BlockNumber, H256)>,
I: IntoIterator<Item = &'b SealedHeader>,
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a,
{
Box::pin(
stream::iter(headers.into_iter().map(|(block_number, header_hash)| {
(|| self.fetch_body(*block_number, *header_hash))
stream::iter(headers.into_iter().map(|header| {
(|| self.fetch_body(header))
.retry(ExponentialBackoff::default().with_max_times(self.retries))
.when(|err| err.is_retryable())
}))
.buffered(self.batch_size),
)
}
}
impl<C: BodiesClient> ConcurrentDownloader<C> {
impl<Client, Consensus> ConcurrentDownloader<Client, Consensus>
where
Client: BodiesClient,
Consensus: ConsensusTrait,
{
/// Create a new concurrent downloader instance.
pub fn new(client: Arc<C>) -> Self {
Self { client, retries: 3, batch_size: 100 }
pub fn new(client: Arc<Client>, consensus: Arc<Consensus>) -> Self {
Self { client, consensus, retries: 3, batch_size: 100 }
}
/// Set the number of blocks to fetch at the same time.
@ -72,13 +91,24 @@ impl<C: BodiesClient> ConcurrentDownloader<C> {
}
/// Fetch a single block body.
async fn fetch_body(
&self,
block_number: BlockNumber,
header_hash: H256,
) -> RequestResult<(BlockNumber, H256, BlockBody)> {
let mut response = self.client.get_block_body(vec![header_hash]).await?;
Ok((block_number, header_hash, response.1.remove(0))) // TODO:
async fn fetch_body(&self, header: &SealedHeader) -> DownloadResult<BlockLocked> {
let (peer_id, mut response) =
self.client.get_block_body(vec![header.hash()]).await?.split();
let body = response.remove(0);
let block = BlockLocked {
header: header.clone(),
body: body.transactions,
ommers: body.ommers.into_iter().map(|header| header.seal()).collect(),
};
match self.consensus.pre_validate_block(&block) {
Ok(_) => Ok(block),
Err(error) => {
self.client.report_bad_message(peer_id);
Err(DownloadError::BlockValidation { hash: header.hash(), error })
}
}
}
}
@ -86,10 +116,14 @@ impl<C: BodiesClient> ConcurrentDownloader<C> {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{generate_bodies, TestClient};
use crate::test_utils::{generate_bodies, TestBodiesClient};
use assert_matches::assert_matches;
use futures_util::stream::{StreamExt, TryStreamExt};
use reth_interfaces::p2p::{bodies::downloader::BodyDownloader, error::RequestError};
use reth_eth_wire::BlockBody;
use reth_interfaces::{
p2p::{bodies::downloader::BodyDownloader, error::RequestError},
test_utils::TestConsensus,
};
use reth_primitives::{PeerId, H256};
use std::{
sync::{
@ -104,9 +138,10 @@ mod tests {
#[tokio::test]
async fn emits_bodies_in_order() {
// Generate some random blocks
let (hashes, mut bodies) = generate_bodies(0..20);
let (headers, mut bodies) = generate_bodies(0..20);
let downloader = ConcurrentDownloader::new(Arc::new(TestClient::new(|hash: Vec<H256>| {
let downloader = ConcurrentDownloader::new(
Arc::new(TestBodiesClient::new(|hash: Vec<H256>| {
let mut bodies = bodies.clone();
async move {
// Simulate that the request for this (random) block takes 0-100ms
@ -120,22 +155,29 @@ mod tests {
)
.into())
}
})));
})),
Arc::new(TestConsensus::default()),
);
assert_matches!(
downloader
.bodies_stream(hashes.iter())
.try_collect::<Vec<(BlockNumber, H256, BlockBody)>>()
.bodies_stream(headers.clone().iter())
.try_collect::<Vec<BlockLocked>>()
.await,
Ok(responses) => {
assert_eq!(
responses,
hashes
headers
.into_iter()
.map(|(num, hash)| {
(num, hash, bodies.remove(&hash).unwrap())
.map(| header | {
let body = bodies .remove(&header.hash()).unwrap();
BlockLocked {
header,
body: body.transactions,
ommers: body.ommers.into_iter().map(|o| o.seal()).collect(),
}
})
.collect::<Vec<(BlockNumber, H256, BlockBody)>>()
.collect::<Vec<BlockLocked>>()
);
}
);
@ -144,14 +186,16 @@ mod tests {
/// Checks that non-retryable errors bubble up
#[tokio::test]
async fn client_failure() {
let downloader =
ConcurrentDownloader::new(Arc::new(TestClient::new(|_: Vec<H256>| async {
let downloader = ConcurrentDownloader::new(
Arc::new(TestBodiesClient::new(|_: Vec<H256>| async {
Err(RequestError::ChannelClosed)
})));
})),
Arc::new(TestConsensus::default()),
);
assert_matches!(
downloader.bodies_stream(&[(0, H256::zero())]).next().await,
Some(Err(RequestError::ChannelClosed))
downloader.bodies_stream(&[SealedHeader::default()]).next().await,
Some(Err(DownloadError::RequestError(RequestError::ChannelClosed)))
);
}
@ -159,8 +203,8 @@ mod tests {
#[tokio::test]
async fn retries_timeouts() {
let retries_left = Arc::new(AtomicUsize::new(3));
let downloader =
ConcurrentDownloader::new(Arc::new(TestClient::new(|mut header_hash: Vec<H256>| {
let downloader = ConcurrentDownloader::new(
Arc::new(TestBodiesClient::new(|mut header_hash: Vec<H256>| {
let retries_left = retries_left.clone();
let _header_hash = header_hash.remove(0);
async move {
@ -175,17 +219,14 @@ mod tests {
.into())
}
}
})));
})),
Arc::new(TestConsensus::default()),
);
assert_matches!(
downloader.bodies_stream(&[(0, H256::zero())]).next().await,
downloader.bodies_stream(&[SealedHeader::default()]).next().await,
Some(Ok(body)) => {
assert_eq!(body.0, 0);
assert_eq!(body.1, H256::zero());
assert_eq!(body.2, BlockBody {
transactions: vec![],
ommers: vec![]
})
assert_eq!(body, BlockLocked::default());
}
);
assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 0);
@ -195,8 +236,8 @@ mod tests {
#[tokio::test]
async fn too_many_retries() {
let retries_left = Arc::new(AtomicUsize::new(3));
let downloader =
ConcurrentDownloader::new(Arc::new(TestClient::new(|mut header_hash: Vec<H256>| {
let downloader = ConcurrentDownloader::new(
Arc::new(TestBodiesClient::new(|mut header_hash: Vec<H256>| {
let _header_hash = header_hash.remove(0);
let retries_left = retries_left.clone();
async move {
@ -211,12 +252,14 @@ mod tests {
.into())
}
}
})))
})),
Arc::new(TestConsensus::default()),
)
.with_retries(0);
assert_matches!(
downloader.bodies_stream(&[(0, H256::zero())]).next().await,
Some(Err(RequestError::Timeout))
downloader.bodies_stream(&[SealedHeader::default()]).next().await,
Some(Err(DownloadError::RequestError(RequestError::Timeout)))
);
assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 2);
}

View File

@ -3,11 +3,10 @@ use reth_interfaces::{
consensus::Consensus,
p2p::{
downloader::{DownloadStream, Downloader},
error::PeerRequestResult,
error::{DownloadError, DownloadResult, PeerRequestResult},
headers::{
client::{BlockHeaders, HeadersClient, HeadersRequest},
downloader::{validate_header_download, HeaderDownloader},
error::DownloadError,
},
},
};
@ -62,7 +61,7 @@ where
&self,
head: SealedHeader,
forkchoice: ForkchoiceState,
) -> DownloadStream<SealedHeader> {
) -> DownloadStream<'_, SealedHeader> {
Box::pin(self.new_download(head, forkchoice))
}
}
@ -208,6 +207,10 @@ where
// queue in the first request
let client = Arc::clone(&self.client);
let req = self.headers_request();
tracing::trace!(
target: "downloaders::headers",
"requesting headers {req:?}"
);
HeadersRequestFuture {
request: req.clone(),
fut: Box::pin(async move { client.get_headers(req).await }),
@ -243,7 +246,7 @@ where
///
/// Returns Ok(false) if the
#[allow(clippy::result_large_err)]
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> {
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
validate_header_download(&self.consensus, header, parent)?;
Ok(())
}
@ -252,7 +255,7 @@ where
fn process_header_response(
&mut self,
response: PeerRequestResult<BlockHeaders>,
) -> Result<(), DownloadError> {
) -> DownloadResult<()> {
match response {
Ok(res) => {
let mut headers = res.1 .0;
@ -290,6 +293,8 @@ where
}
Ok(())
}
// most likely a noop, because this error
// would've been handled by the fetcher internally
Err(err) => Err(err.into()),
}
}
@ -300,7 +305,7 @@ where
C: Consensus + 'static,
H: HeadersClient + 'static,
{
type Item = Result<SealedHeader, DownloadError>;
type Item = DownloadResult<SealedHeader>;
/// Linear header downloader implemented as a [Stream]. The downloader sends header
/// requests until the head is reached and buffers the responses. If the request future
@ -330,6 +335,7 @@ where
let mut fut = this.get_or_init_fut();
match fut.poll_unpin(cx) {
Poll::Ready(result) => {
let peer_id = result.as_ref().map(|res| res.peer_id()).ok();
// Process the response, buffering the headers
// in case of successful validation
match this.process_header_response(result) {
@ -342,6 +348,14 @@ where
}
}
Err(err) => {
// Penalize the peer for bad response
if let Some(peer_id) = peer_id {
tracing::trace!(
target: "downloaders::headers",
"penalizing peer {peer_id} for {err:?}"
);
this.client.report_bad_message(peer_id);
}
// Response is invalid, attempt to retry
if this.try_fuse_request_fut(&mut fut).is_err() {
tracing::trace!(

View File

@ -3,10 +3,10 @@
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_interfaces::{
p2p::{bodies::client::BodiesClient, error::PeerRequestResult},
p2p::{bodies::client::BodiesClient, downloader::DownloadClient, error::PeerRequestResult},
test_utils::generators::random_block_range,
};
use reth_primitives::{BlockNumber, H256};
use reth_primitives::{PeerId, SealedHeader, H256};
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
@ -18,12 +18,11 @@ use tokio::sync::Mutex;
/// Generate a set of bodies and their corresponding block hashes
pub(crate) fn generate_bodies(
rng: std::ops::Range<u64>,
) -> (Vec<(BlockNumber, H256)>, HashMap<H256, BlockBody>) {
) -> (Vec<SealedHeader>, HashMap<H256, BlockBody>) {
let blocks = random_block_range(rng, H256::zero());
let hashes: Vec<(BlockNumber, H256)> =
blocks.iter().map(|block| (block.number, block.hash())).collect();
let bodies: HashMap<H256, BlockBody> = blocks
let headers = blocks.iter().map(|block| block.header.clone()).collect();
let bodies = blocks
.into_iter()
.map(|block| {
(
@ -36,26 +35,32 @@ pub(crate) fn generate_bodies(
})
.collect();
(hashes, bodies)
(headers, bodies)
}
/// A [BodiesClient] for testing.
pub(crate) struct TestClient<F>(pub(crate) Arc<Mutex<F>>);
pub(crate) struct TestBodiesClient<F>(pub(crate) Arc<Mutex<F>>);
impl<F> Debug for TestClient<F> {
impl<F> Debug for TestBodiesClient<F> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestClient").finish_non_exhaustive()
f.debug_struct("TestBodiesClient").finish_non_exhaustive()
}
}
impl<F> TestClient<F> {
impl<F> TestBodiesClient<F> {
pub(crate) fn new(f: F) -> Self {
Self(Arc::new(Mutex::new(f)))
}
}
impl<F: Send + Sync> DownloadClient for TestBodiesClient<F> {
fn report_bad_message(&self, _peer_id: PeerId) {
// noop
}
}
#[async_trait]
impl<F, Fut> BodiesClient for TestClient<F>
impl<F, Fut> BodiesClient for TestBodiesClient<F>
where
F: FnMut(Vec<H256>) -> Fut + Send + Sync,
Fut: Future<Output = PeerRequestResult<Vec<BlockBody>>> + Send,

View File

@ -1,13 +1,17 @@
//! A client implementation that can interact with the network and download data.
use crate::fetch::DownloadRequest;
use crate::{
fetch::DownloadRequest,
peers::{PeersHandle, ReputationChangeKind},
};
use reth_eth_wire::{BlockBody, BlockHeaders};
use reth_interfaces::p2p::{
bodies::client::BodiesClient,
downloader::DownloadClient,
error::PeerRequestResult,
headers::client::{HeadersClient, HeadersRequest},
};
use reth_primitives::{WithPeerId, H256};
use reth_primitives::{PeerId, WithPeerId, H256};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
/// Front-end API for fetching data from the network.
@ -15,6 +19,14 @@ use tokio::sync::{mpsc::UnboundedSender, oneshot};
pub struct FetchClient {
/// Sender half of the request channel.
pub(crate) request_tx: UnboundedSender<DownloadRequest>,
/// The handle to the peers
pub(crate) peers_handle: PeersHandle,
}
impl DownloadClient for FetchClient {
fn report_bad_message(&self, peer_id: PeerId) {
self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage);
}
}
#[async_trait::async_trait]

View File

@ -1,6 +1,6 @@
//! Fetch data from the network.
use crate::message::BlockRequest;
use crate::{message::BlockRequest, peers::PeersHandle};
use futures::StreamExt;
use reth_eth_wire::{BlockBody, GetBlockBodies, GetBlockHeaders};
use reth_interfaces::p2p::{
@ -32,6 +32,8 @@ pub struct StateFetcher {
HashMap<PeerId, Request<Vec<H256>, PeerRequestResult<Vec<BlockBody>>>>,
/// The list of available peers for requests.
peers: HashMap<PeerId, Peer>,
/// The handle to the peers manager
peers_handle: PeersHandle,
/// Requests queued for processing
queued_requests: VecDeque<DownloadRequest>,
/// Receiver for new incoming download requests
@ -43,6 +45,19 @@ pub struct StateFetcher {
// === impl StateSyncer ===
impl StateFetcher {
pub(crate) fn new(peers_handle: PeersHandle) -> Self {
let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
Self {
inflight_headers_requests: Default::default(),
inflight_bodies_requests: Default::default(),
peers: Default::default(),
peers_handle,
queued_requests: Default::default(),
download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
download_requests_tx,
}
}
/// Invoked when connected to a new peer.
pub(crate) fn new_active_peer(&mut self, peer_id: PeerId, best_hash: H256, best_number: u64) {
self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number });
@ -221,20 +236,9 @@ impl StateFetcher {
/// Returns a new [`FetchClient`] that can send requests to this type.
pub(crate) fn client(&self) -> FetchClient {
FetchClient { request_tx: self.download_requests_tx.clone() }
}
}
impl Default for StateFetcher {
fn default() -> Self {
let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
Self {
inflight_headers_requests: Default::default(),
inflight_bodies_requests: Default::default(),
peers: Default::default(),
queued_requests: Default::default(),
download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
download_requests_tx,
FetchClient {
request_tx: self.download_requests_tx.clone(),
peers_handle: self.peers_handle.clone(),
}
}
}
@ -350,12 +354,15 @@ pub(crate) enum BlockResponseOutcome {
#[cfg(test)]
mod tests {
use crate::{peers::PeersManager, PeersConfig};
use super::*;
use std::future::poll_fn;
#[tokio::test(flavor = "multi_thread")]
async fn test_poll_fetcher() {
let mut fetcher = StateFetcher::default();
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher = StateFetcher::new(manager.handle());
poll_fn(move |cx| {
assert!(fetcher.poll(cx).is_pending());

View File

@ -70,6 +70,7 @@ where
peers_manager: PeersManager,
genesis_hash: H256,
) -> Self {
let state_fetcher = StateFetcher::new(peers_manager.handle());
Self {
active_peers: Default::default(),
peers_manager,
@ -77,7 +78,7 @@ where
client,
discovery,
genesis_hash,
state_fetcher: Default::default(),
state_fetcher,
}
}

View File

@ -2,7 +2,7 @@ use crate::{
db::StageDB, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
};
use futures_util::TryStreamExt;
use futures_util::StreamExt;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::{Database, DatabaseGAT},
@ -13,10 +13,10 @@ use reth_db::{
use reth_interfaces::{consensus::Consensus, p2p::bodies::downloader::BodyDownloader};
use reth_primitives::{
proofs::{EMPTY_LIST_HASH, EMPTY_ROOT},
BlockLocked, BlockNumber, SealedHeader, H256,
BlockNumber, SealedHeader,
};
use std::{fmt::Debug, sync::Arc};
use tracing::warn;
use tracing::{error, warn};
const BODIES: StageId = StageId("Bodies");
@ -61,7 +61,7 @@ pub struct BodyStage<D: BodyDownloader, C: Consensus> {
///
/// Smaller batch sizes result in less memory usage, but more disk I/O. Larger batch sizes
/// result in more memory usage, less disk I/O, and more infrequent checkpoints.
pub batch_size: u64,
pub commit_threshold: u64,
}
#[async_trait::async_trait]
@ -91,7 +91,7 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
let starting_block = previous_block + 1;
// Short circuit in case we already reached the target block
let target = previous_stage_progress.min(starting_block + self.batch_size);
let target = previous_stage_progress.min(starting_block + self.commit_threshold);
if target <= previous_block {
return Ok(ExecOutput { stage_progress: target, reached_tip: true, done: true })
}
@ -106,45 +106,37 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
// Get id for the first transaction in the block
let mut first_tx_id = db.get_first_tx_id(starting_block)?;
// Cursor used to look up headers for block pre-validation
let mut header_cursor = db.cursor::<tables::Headers>()?;
// NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator
// on every iteration of the while loop -_-
let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter());
let mut highest_block = previous_block;
while let Some((block_number, header_hash, body)) =
bodies_stream.try_next().await.map_err(|err| StageError::Internal(err.into()))?
{
// Fetch the block header for pre-validation
let block = BlockLocked {
header: SealedHeader::new(
header_cursor
.seek_exact((block_number, header_hash).into())?
.ok_or(DatabaseIntegrityError::Header {
number: block_number,
hash: header_hash,
})?
.1,
header_hash,
),
body: body.transactions,
ommers: body.ommers.into_iter().map(|header| header.seal()).collect(),
while let Some(result) = bodies_stream.next().await {
let block = match result {
Ok(block) => block,
Err(err) => {
error!(
"Encountered error downloading block {}. Details: {:?}",
highest_block + 1,
err
);
// Exit the stage early
return Ok(ExecOutput {
stage_progress: highest_block,
done: false,
reached_tip: false,
})
}
};
// Pre-validate the block and unwind if it is invalid
self.consensus
.pre_validate_block(&block)
.map_err(|err| StageError::Validation { block: block_number, error: err })?;
let block_number = block.number;
// Write block
let key = (block_number, header_hash).into();
let key = (block_number, block.hash()).into();
// Additional +1, increments tx count to allow indexing of ChangeSet that contains block
// reward. This can't be added to last transaction ChangeSet as it would
// break if block is empty.
let this_tx_count = first_tx_id +
block.body.len() as u64 +
if self.consensus.has_block_reward(block_number) { 1 } else { 0 };
if self.consensus.has_block_reward(block.number) { 1 } else { 0 };
tx_count_cursor.append(key, this_tx_count)?;
ommers_cursor.append(
key,
@ -229,7 +221,7 @@ impl<D: BodyDownloader, C: Consensus> BodyStage<D, C> {
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
starting_block: BlockNumber,
target: BlockNumber,
) -> Result<Vec<(BlockNumber, H256)>, StageError> {
) -> Result<Vec<SealedHeader>, StageError> {
let mut header_cursor = tx.cursor::<tables::Headers>()?;
let mut header_hashes_cursor = tx.cursor::<tables::CanonicalHeaders>()?;
let mut walker = header_hashes_cursor
@ -238,15 +230,18 @@ impl<D: BodyDownloader, C: Consensus> BodyStage<D, C> {
let mut bodies_to_download = Vec::new();
while let Some(Ok((block_number, header_hash))) = walker.next() {
let header = header_cursor
.seek_exact((block_number, header_hash).into())?
.ok_or(DatabaseIntegrityError::Header { number: block_number, hash: header_hash })?
.1;
let (_, header) = header_cursor.seek_exact((block_number, header_hash).into())?.ok_or(
DatabaseIntegrityError::Header { number: block_number, hash: header_hash },
)?;
if header.ommers_hash == EMPTY_LIST_HASH && header.transactions_root == EMPTY_ROOT {
// TODO: fix this
// If we indeed move to the new changeset structure let's not forget to add a note
// that the gaps issue with the returned empty bodies stream is no longer present
continue
}
bodies_to_download.push((block_number, header_hash));
bodies_to_download.push(SealedHeader::new(header, header_hash));
}
Ok(bodies_to_download)
@ -261,7 +256,10 @@ mod tests {
PREV_STAGE_ID,
};
use assert_matches::assert_matches;
use reth_interfaces::{consensus, p2p::error::RequestError};
use reth_interfaces::{
consensus,
p2p::error::{DownloadError, RequestError},
};
use std::collections::HashMap;
use test_utils::*;
@ -368,21 +366,34 @@ mod tests {
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
}
/// Checks that the stage asks to unwind if pre-validation of the block fails.
/// Checks that the stage returns to the pipeline on validation failure.
#[tokio::test]
async fn pre_validation_failure() {
let (stage_progress, previous_stage) = (1, 20);
// Set up test runner
let mut runner = BodyTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
runner.seed_execution(input).expect("failed to seed execution");
let blocks = runner.seed_execution(input).expect("failed to seed execution");
// Fail validation
runner.consensus.set_fail_validation(true);
let responses = blocks
.iter()
.map(|b| {
(
b.hash(),
Err(DownloadError::BlockValidation {
hash: b.hash(),
error: consensus::Error::BaseFeeMissing,
}),
)
})
.collect::<HashMap<_, _>>();
runner.set_responses(responses);
// Run the stage
let rx = runner.execute(input);
@ -390,7 +401,8 @@ mod tests {
// Check that the error bubbles up
assert_matches!(
rx.await.unwrap(),
Err(StageError::Validation { error: consensus::Error::BaseFeeMissing, .. })
Ok(ExecOutput { stage_progress: out_stage_progress, done: false, reached_tip: false })
if out_stage_progress == stage_progress
);
assert!(runner.validate_execution(input, None).is_ok(), "execution validation");
}
@ -467,13 +479,20 @@ mod tests {
// overwrite responses
let header = blocks.last().unwrap();
runner.set_responses(HashMap::from([(header.hash(), Err(RequestError::Timeout))]));
runner.set_responses(HashMap::from([(
header.hash(),
Err(DownloadError::RequestError(RequestError::Timeout)),
)]));
// Run the stage
let rx = runner.execute(input);
// Check that the error bubbles up
assert_matches!(rx.await.unwrap(), Err(StageError::Internal(_)));
assert_matches!(
rx.await.unwrap(),
Ok(ExecOutput { stage_progress: out_stage_progress, done: false, reached_tip: false })
if out_stage_progress == stage_progress
);
assert!(runner.validate_execution(input, None).is_ok(), "execution validation");
}
@ -496,11 +515,9 @@ mod tests {
use reth_eth_wire::BlockBody;
use reth_interfaces::{
p2p::{
bodies::{
client::BodiesClient,
downloader::{BodiesStream, BodyDownloader},
},
error::{PeerRequestResult, RequestResult},
bodies::{client::BodiesClient, downloader::BodyDownloader},
downloader::{DownloadClient, DownloadStream, Downloader},
error::{DownloadResult, PeerRequestResult},
},
test_utils::{
generators::{random_block_range, random_signed_tx},
@ -514,7 +531,7 @@ mod tests {
pub(crate) const GENESIS_HASH: H256 = H256::zero();
/// A helper to create a collection of resulted-wrapped block bodies keyed by their hash.
pub(crate) fn body_by_hash(block: &BlockLocked) -> (H256, RequestResult<BlockBody>) {
pub(crate) fn body_by_hash(block: &BlockLocked) -> (H256, DownloadResult<BlockBody>) {
(
block.hash(),
Ok(BlockBody {
@ -527,7 +544,7 @@ mod tests {
/// A helper struct for running the [BodyStage].
pub(crate) struct BodyTestRunner {
pub(crate) consensus: Arc<TestConsensus>,
responses: HashMap<H256, RequestResult<BlockBody>>,
responses: HashMap<H256, DownloadResult<BlockBody>>,
db: TestStageDB,
batch_size: u64,
}
@ -550,7 +567,7 @@ mod tests {
pub(crate) fn set_responses(
&mut self,
responses: HashMap<H256, RequestResult<BlockBody>>,
responses: HashMap<H256, DownloadResult<BlockBody>>,
) {
self.responses = responses;
}
@ -567,7 +584,7 @@ mod tests {
BodyStage {
downloader: Arc::new(TestBodyDownloader::new(self.responses.clone())),
consensus: self.consensus.clone(),
batch_size: self.batch_size,
commit_threshold: self.batch_size,
}
}
}
@ -733,6 +750,12 @@ mod tests {
#[derive(Debug)]
pub(crate) struct NoopClient;
impl DownloadClient for NoopClient {
fn report_bad_message(&self, _: reth_primitives::PeerId) {
panic!("Noop client should not be called")
}
}
#[async_trait::async_trait]
impl BodiesClient for NoopClient {
async fn get_block_body(&self, _: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> {
@ -744,38 +767,47 @@ mod tests {
/// A [BodyDownloader] that is backed by an internal [HashMap] for testing.
#[derive(Debug, Default, Clone)]
pub(crate) struct TestBodyDownloader {
responses: HashMap<H256, RequestResult<BlockBody>>,
responses: HashMap<H256, DownloadResult<BlockBody>>,
}
impl TestBodyDownloader {
pub(crate) fn new(responses: HashMap<H256, RequestResult<BlockBody>>) -> Self {
pub(crate) fn new(responses: HashMap<H256, DownloadResult<BlockBody>>) -> Self {
Self { responses }
}
}
impl BodyDownloader for TestBodyDownloader {
impl Downloader for TestBodyDownloader {
type Client = NoopClient;
type Consensus = TestConsensus;
fn client(&self) -> &Self::Client {
unreachable!()
}
fn bodies_stream<'a, 'b, I>(&'a self, hashes: I) -> BodiesStream<'a>
fn consensus(&self) -> &Self::Consensus {
unreachable!()
}
}
impl BodyDownloader for TestBodyDownloader {
fn bodies_stream<'a, 'b, I>(&'a self, hashes: I) -> DownloadStream<'a, BlockLocked>
where
I: IntoIterator<Item = &'b (BlockNumber, H256)>,
I: IntoIterator<Item = &'b SealedHeader>,
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a,
{
Box::pin(futures_util::stream::iter(hashes.into_iter().map(
|(block_number, hash)| {
Box::pin(futures_util::stream::iter(hashes.into_iter().map(|header| {
let result = self
.responses
.get(hash)
.get(&header.hash())
.expect("Stage tried downloading a block we do not have.")
.clone()?;
Ok((*block_number, *hash, result))
},
)))
Ok(BlockLocked {
header: header.clone(),
body: result.transactions,
ommers: result.ommers.into_iter().map(|header| header.seal()).collect(),
})
})))
}
}
}

View File

@ -12,10 +12,12 @@ use reth_db::{
};
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
p2p::headers::{
p2p::{
error::DownloadError,
headers::{
client::{HeadersClient, StatusUpdater},
downloader::{ensure_parent, HeaderDownloader},
error::DownloadError,
},
},
};
use reth_primitives::{BlockNumber, SealedHeader, H256, U256};