feat: convert HeadersClient BodiesClient futures into associated types (#1063)

This commit is contained in:
Aurélien
2023-01-30 10:25:15 +01:00
committed by GitHub
parent 0c2225956d
commit e2ac4d3f3c
15 changed files with 214 additions and 129 deletions

1
Cargo.lock generated
View File

@ -4124,7 +4124,6 @@ name = "reth-downloaders"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"futures",
"futures-util",
"metrics",

View File

@ -1,21 +1,25 @@
use std::pin::Pin;
use crate::p2p::{download::DownloadClient, error::PeerRequestResult, priority::Priority};
use async_trait::async_trait;
use futures::Future;
use reth_eth_wire::BlockBody;
use reth_primitives::H256;
/// The bodies future type
pub type BodiesFut = Pin<Box<dyn Future<Output = PeerRequestResult<Vec<BlockBody>>> + Send + Sync>>;
/// A client capable of downloading block bodies.
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BodiesClient: DownloadClient {
/// The bodies type
type Output: Future<Output = PeerRequestResult<Vec<BlockBody>>> + Sync + Send + Unpin;
/// Fetches the block body for the requested block.
async fn get_block_bodies(&self, hashes: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> {
self.get_block_bodies_with_priority(hashes, Priority::Normal).await
fn get_block_bodies(&self, hashes: Vec<H256>) -> Self::Output {
self.get_block_bodies_with_priority(hashes, Priority::Normal)
}
/// Fetches the block body for the requested block with priority
async fn get_block_bodies_with_priority(
&self,
hashes: Vec<H256>,
priority: Priority,
) -> PeerRequestResult<Vec<BlockBody>>;
fn get_block_bodies_with_priority(&self, hashes: Vec<H256>, priority: Priority)
-> Self::Output;
}

View File

@ -1,8 +1,8 @@
use crate::p2p::{download::DownloadClient, error::PeerRequestResult, priority::Priority};
use async_trait::async_trait;
use futures::Future;
pub use reth_eth_wire::BlockHeaders;
use reth_primitives::{BlockHashOrNumber, HeadersDirection, H256, U256};
use std::fmt::Debug;
use std::{fmt::Debug, pin::Pin};
/// The header request struct to be sent to connected peers, which
/// will proceed to ask them to stream the requested headers to us.
@ -16,23 +16,28 @@ pub struct HeadersRequest {
pub direction: HeadersDirection,
}
/// The headers future type
pub type HeadersFut = Pin<Box<dyn Future<Output = PeerRequestResult<BlockHeaders>> + Send + Sync>>;
/// The block headers downloader client
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeadersClient: DownloadClient {
/// The headers type
type Output: Future<Output = PeerRequestResult<BlockHeaders>> + Sync + Send + Unpin;
/// Sends the header request to the p2p network and returns the header response received from a
/// peer.
async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult<BlockHeaders> {
self.get_headers_with_priority(request, Priority::Normal).await
fn get_headers(&self, request: HeadersRequest) -> Self::Output {
self.get_headers_with_priority(request, Priority::Normal)
}
/// Sends the header request to the p2p network with priroity set and returns the header
/// response received from a peer.
async fn get_headers_with_priority(
fn get_headers_with_priority(
&self,
request: HeadersRequest,
priority: Priority,
) -> PeerRequestResult<BlockHeaders>;
) -> Self::Output;
}
/// The status updater for updating the status of the p2p node

View File

@ -1,11 +1,18 @@
use crate::p2p::{
bodies::client::BodiesClient, download::DownloadClient, error::PeerRequestResult,
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
error::PeerRequestResult,
priority::Priority,
};
use async_trait::async_trait;
use futures::{future, Future, FutureExt};
use reth_eth_wire::BlockBody;
use reth_primitives::H256;
use std::fmt::{Debug, Formatter};
use reth_primitives::{WithPeerId, H256};
use std::{
fmt::{Debug, Formatter},
pin::Pin,
};
use tokio::sync::oneshot::{self, Receiver};
/// A test client for fetching bodies
pub struct TestBodiesClient<F> {
@ -29,16 +36,22 @@ impl<F: Sync + Send> DownloadClient for TestBodiesClient<F> {
}
}
#[async_trait]
impl<F> BodiesClient for TestBodiesClient<F>
where
F: Fn(Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> + Send + Sync,
{
async fn get_block_bodies_with_priority(
type Output = BodiesFut;
fn get_block_bodies_with_priority(
&self,
hashes: Vec<H256>,
_priority: Priority,
) -> PeerRequestResult<Vec<BlockBody>> {
(self.responder)(hashes)
) -> Self::Output {
let (tx, rx) = oneshot::channel();
tx.send((self.responder)(hashes));
Box::pin(rx.map(|x| match x {
Ok(value) => value,
Err(err) => Err(err.into()),
}))
}
}

View File

@ -11,10 +11,11 @@ use crate::{
priority::Priority,
},
};
use futures::{Future, FutureExt, Stream, StreamExt};
use futures::{future, Future, FutureExt, Stream, StreamExt};
use reth_eth_wire::BlockHeaders;
use reth_primitives::{
BlockHash, BlockNumber, Header, HeadersDirection, PeerId, SealedBlock, SealedHeader, H256,
BlockHash, BlockNumber, Header, HeadersDirection, PeerId, SealedBlock, SealedHeader,
WithPeerId, H256,
};
use reth_rpc_types::engine::ForkchoiceState;
use std::{
@ -26,7 +27,12 @@ use std::{
},
task::{ready, Context, Poll},
};
use tokio::sync::{watch, watch::error::SendError, Mutex};
use tokio::sync::{
oneshot::{error::RecvError, Receiver},
watch,
watch::error::SendError,
Mutex,
};
/// A test downloader which just returns the values that have been pushed to it.
#[derive(Debug)]
@ -98,7 +104,7 @@ impl Stream for TestHeaderDownloader {
}
}
type TestHeadersFut = Pin<Box<dyn Future<Output = PeerRequestResult<BlockHeaders>> + Send>>;
type TestHeadersFut = Pin<Box<dyn Future<Output = PeerRequestResult<BlockHeaders>> + Sync + Send>>;
struct TestDownload {
client: Arc<TestHeadersClient>,
@ -109,9 +115,6 @@ struct TestDownload {
done: bool,
}
/// SAFETY: All the mutations are performed through an exclusive reference on `poll`
unsafe impl Sync for TestDownload {}
impl TestDownload {
fn get_or_init_fut(&mut self) -> &mut TestHeadersFut {
if self.fut.is_none() {
@ -121,7 +124,7 @@ impl TestDownload {
start: reth_primitives::BlockHashOrNumber::Number(0), // ignored
};
let client = Arc::clone(&self.client);
self.fut = Some(Box::pin(async move { client.get_headers(request).await }));
self.fut = Some(Box::pin(client.get_headers(request)));
}
self.fut.as_mut().unwrap()
}
@ -226,22 +229,30 @@ impl DownloadClient for TestHeadersClient {
}
}
#[async_trait::async_trait]
impl HeadersClient for TestHeadersClient {
async fn get_headers_with_priority(
type Output = TestHeadersFut;
fn get_headers_with_priority(
&self,
request: HeadersRequest,
_priority: Priority,
) -> PeerRequestResult<BlockHeaders> {
self.request_attempts.fetch_add(1, Ordering::SeqCst);
if let Some(err) = &mut *self.error.lock().await {
return Err(err.clone())
}
) -> Self::Output {
let responses = self.responses.clone();
let error = self.error.clone();
let mut lock = self.responses.lock().await;
let len = lock.len().min(request.limit as usize);
let resp = lock.drain(..len).collect();
return Ok((PeerId::default(), BlockHeaders(resp)).into())
self.request_attempts.fetch_add(1, Ordering::SeqCst);
Box::pin(async move {
if let Some(err) = &mut *error.lock().await {
return Err(err.clone())
}
let mut lock = responses.lock().await;
let len = lock.len().min(request.limit as usize);
let resp = lock.drain(..len).collect();
let with_peer_id = WithPeerId::from((PeerId::default(), BlockHeaders(resp)));
Ok(with_peer_id)
})
}
}

View File

@ -11,7 +11,7 @@ description = "Implementations of various block downloaders"
# reth
reth-interfaces = { path = "../../interfaces" }
reth-primitives = { path = "../../primitives" }
reth-eth-wire = { path= "../eth-wire" }
reth-eth-wire = { path = "../eth-wire" }
reth-db = { path = "../../storage/db" }
reth-metrics-derive = { path = "../../metrics/metrics-derive" }
@ -31,6 +31,5 @@ reth-db = { path = "../../storage/db", features = ["test-utils"] }
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
reth-tracing = { path = "../../tracing" }
async-trait = "0.1.58"
assert_matches = "1.5.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

View File

@ -42,7 +42,7 @@ pub const BODIES_DOWNLOADER_SCOPE: &str = "downloaders.bodies";
/// All blocks in a batch are fetched at the same time.
#[must_use = "Stream does nothing unless polled"]
#[derive(Debug)]
pub struct ConcurrentDownloader<B, DB> {
pub struct ConcurrentDownloader<B: BodiesClient, DB> {
/// The bodies client
client: Arc<B>,
/// The consensus client
@ -408,19 +408,6 @@ where
}
}
/// SAFETY: we need to ensure `ConcurrentDownloader` is `Sync` because the of the [BodyDownloader]
/// trait. While [BodiesClient] is also `Sync`, the [BodiesClient::get_block_bodies] future does
/// not enforce `Sync` (async_trait). The future itself does not use any interior mutability
/// whatsoever: All the mutations are performed through an exclusive reference on
/// `ConcurrentDownloader` when the Stream is polled. This means it suffices that
/// `ConcurrentDownloader` is Sync:
unsafe impl<B, DB> Sync for ConcurrentDownloader<B, DB>
where
B: BodiesClient,
DB: Database,
{
}
#[derive(Debug)]
struct OrderedBodiesResponse(Vec<BlockResponse>);

View File

@ -17,7 +17,7 @@ use std::{
/// The wrapper around [FuturesUnordered] that keeps information
/// about the blocks currently being requested.
#[derive(Debug)]
pub(crate) struct BodiesRequestQueue<B> {
pub(crate) struct BodiesRequestQueue<B: BodiesClient> {
/// Inner body request queue.
inner: FuturesUnordered<BodiesRequestFuture<B>>,
/// The block numbers being requested.

View File

@ -5,7 +5,7 @@ use reth_interfaces::{
consensus::{Consensus as ConsensusTrait, Consensus},
p2p::{
bodies::{client::BodiesClient, response::BlockResponse},
error::{DownloadError, PeerRequestResult},
error::DownloadError,
},
};
use reth_primitives::{PeerId, SealedBlock, SealedHeader, H256};
@ -15,8 +15,6 @@ use std::{
task::{ready, Context, Poll},
};
type BodiesFut = Pin<Box<dyn Future<Output = PeerRequestResult<Vec<BlockBody>>> + Send>>;
/// Body request implemented as a [Future].
///
/// The future will poll the underlying request until fullfilled.
@ -36,7 +34,7 @@ type BodiesFut = Pin<Box<dyn Future<Output = PeerRequestResult<Vec<BlockBody>>>
/// All errors regarding the response cause the peer to get penalized, meaning that adversaries
/// that try to give us bodies that do not match the requested order are going to be penalized
/// and eventually disconnected.
pub(crate) struct BodiesRequestFuture<B> {
pub(crate) struct BodiesRequestFuture<B: BodiesClient> {
client: Arc<B>,
consensus: Arc<dyn Consensus>,
metrics: DownloaderMetrics,
@ -45,7 +43,7 @@ pub(crate) struct BodiesRequestFuture<B> {
// Remaining hashes to download
hashes_to_download: Vec<H256>,
buffer: Vec<(PeerId, BlockBody)>,
fut: Option<BodiesFut>,
fut: Option<B::Output>,
}
impl<B> BodiesRequestFuture<B>
@ -93,7 +91,7 @@ where
let client = Arc::clone(&self.client);
let request = self.hashes_to_download.clone();
tracing::trace!(target: "downloaders::bodies", request_len = request.len(), "Requesting bodies");
self.fut = Some(Box::pin(async move { client.get_block_bodies(request).await }));
self.fut = Some(client.get_block_bodies(request));
}
fn reset_hashes(&mut self) {

View File

@ -44,7 +44,7 @@ pub const HEADERS_DOWNLOADER_SCOPE: &str = "downloaders.headers";
/// the batches of headers that this downloader yields will start at the chain tip and move towards
/// the local head: falling block numbers.
#[must_use = "Stream does nothing unless polled"]
pub struct LinearDownloader<H> {
pub struct LinearDownloader<H: HeadersClient> {
/// Consensus client used to validate headers
consensus: Arc<dyn Consensus>,
/// Client used to download headers.
@ -73,9 +73,9 @@ pub struct LinearDownloader<H> {
///
/// This will give us the block number of the `sync_target`, after which we can send multiple
/// requests at a time.
sync_target_request: Option<HeadersRequestFuture>,
sync_target_request: Option<HeadersRequestFuture<H::Output>>,
/// requests in progress
in_progress_queue: FuturesUnordered<HeadersRequestFuture>,
in_progress_queue: FuturesUnordered<HeadersRequestFuture<H::Output>>,
/// Buffered, unvalidated responses
buffered_responses: BinaryHeap<OrderedHeadersResponse>,
/// Buffered, _sorted_ and validated headers ready to be returned.
@ -467,11 +467,15 @@ where
self.in_progress_queue.push(self.request_fut(request, priority));
}
fn request_fut(&self, request: HeadersRequest, priority: Priority) -> HeadersRequestFuture {
fn request_fut(
&self,
request: HeadersRequest,
priority: Priority,
) -> HeadersRequestFuture<H::Output> {
let client = Arc::clone(&self.client);
HeadersRequestFuture {
request: Some(request.clone()),
fut: Box::pin(async move { client.get_headers_with_priority(request, priority).await }),
fut: client.get_headers_with_priority(request, priority),
}
}
@ -689,28 +693,23 @@ where
}
}
/// SAFETY: we need to ensure `LinearDownloader` is `Sync` because the of the [HeaderDownloader]
/// trait. While [HeadersClient] is also `Sync`, the [HeadersClient::get_headers] future does not
/// enforce `Sync` (async_trait). The future itself does not use any interior mutability whatsoever:
/// All the mutations are performed through an exclusive reference on `LinearDownloader` when
/// the Stream is polled. This means it suffices that `LinearDownloader` is Sync:
unsafe impl<H> Sync for LinearDownloader<H> where H: HeadersClient {}
type HeadersFut = Pin<Box<dyn Future<Output = PeerRequestResult<BlockHeaders>> + Send>>;
/// A future that returns a list of [`BlockHeaders`] on success.
struct HeadersRequestFuture {
struct HeadersRequestFuture<F> {
request: Option<HeadersRequest>,
fut: HeadersFut,
fut: F,
}
impl Future for HeadersRequestFuture {
impl<F> Future for HeadersRequestFuture<F>
where
F: Future<Output = PeerRequestResult<BlockHeaders>> + Sync + Send + Unpin,
{
type Output = HeadersRequestOutcome;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let outcome = ready!(this.fut.poll_unpin(cx));
let request = this.request.take().unwrap();
Poll::Ready(HeadersRequestOutcome { request, outcome })
}
}

View File

@ -1,10 +1,10 @@
//! Test helper impls
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_interfaces::{
p2p::{
bodies::client::BodiesClient, download::DownloadClient, error::PeerRequestResult,
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
priority::Priority,
},
test_utils::generators::random_block_range,
@ -13,7 +13,10 @@ use reth_primitives::{PeerId, SealedHeader, H256};
use std::{
collections::HashMap,
fmt::Debug,
sync::atomic::{AtomicU64, Ordering},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tokio::sync::Mutex;
@ -47,7 +50,7 @@ pub(crate) fn generate_bodies(
/// A [BodiesClient] for testing.
#[derive(Debug, Default)]
pub(crate) struct TestBodiesClient {
bodies: Mutex<HashMap<H256, BlockBody>>,
bodies: Arc<Mutex<HashMap<H256, BlockBody>>>,
should_delay: bool,
max_batch_size: Option<usize>,
times_requested: AtomicU64,
@ -55,7 +58,7 @@ pub(crate) struct TestBodiesClient {
impl TestBodiesClient {
pub(crate) fn with_bodies(mut self, bodies: HashMap<H256, BlockBody>) -> Self {
self.bodies = Mutex::new(bodies);
self.bodies = Arc::new(Mutex::new(bodies));
self
}
@ -84,30 +87,39 @@ impl DownloadClient for TestBodiesClient {
}
}
#[async_trait]
impl BodiesClient for TestBodiesClient {
async fn get_block_bodies_with_priority(
type Output = BodiesFut;
fn get_block_bodies_with_priority(
&self,
hashes: Vec<H256>,
_priority: Priority,
) -> PeerRequestResult<Vec<BlockBody>> {
if self.should_delay {
tokio::time::sleep(Duration::from_millis(hashes[0].to_low_u64_be() % 100)).await;
}
) -> Self::Output {
let should_delay = self.should_delay;
let bodies = self.bodies.clone();
let max_batch_size = self.max_batch_size.clone();
self.times_requested.fetch_add(1, Ordering::Relaxed);
let bodies = &mut *self.bodies.lock().await;
Ok((
PeerId::default(),
hashes
.into_iter()
.take(self.max_batch_size.unwrap_or(usize::MAX))
.map(|hash| {
bodies
.remove(&hash)
.expect("Downloader asked for a block it should not ask for")
})
.collect(),
)
.into())
Box::pin(async move {
if should_delay {
tokio::time::sleep(Duration::from_millis(hashes[0].to_low_u64_be() % 100)).await;
}
let bodies = &mut *bodies.lock().await;
Ok((
PeerId::default(),
hashes
.into_iter()
.take(max_batch_size.unwrap_or(usize::MAX))
.map(|hash| {
bodies
.remove(&hash)
.expect("Downloader asked for a block it should not ask for")
})
.collect(),
)
.into())
})
}
}

View File

@ -1,12 +1,12 @@
//! A client implementation that can interact with the network and download data.
use crate::{fetch::DownloadRequest, peers::PeersHandle};
use reth_eth_wire::{BlockBody, BlockHeaders};
use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse, peers::PeersHandle};
use futures::{future, FutureExt};
use reth_interfaces::p2p::{
bodies::client::BodiesClient,
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
error::PeerRequestResult,
headers::client::{HeadersClient, HeadersRequest},
error::RequestError,
headers::client::{HeadersClient, HeadersFut, HeadersRequest},
priority::Priority,
};
use reth_network_api::ReputationChangeKind;
@ -15,7 +15,10 @@ use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tokio::sync::{
mpsc::UnboundedSender,
oneshot::{self},
};
/// Front-end API for fetching data from the network.
///
@ -75,30 +78,46 @@ impl DownloadClient for FetchClient {
}
}
#[async_trait::async_trait]
impl HeadersClient for FetchClient {
type Output = HeadersFut;
/// Sends a `GetBlockHeaders` request to an available peer.
async fn get_headers_with_priority(
fn get_headers_with_priority(
&self,
request: HeadersRequest,
priority: Priority,
) -> PeerRequestResult<BlockHeaders> {
) -> Self::Output {
let (response, rx) = oneshot::channel();
self.request_tx.send(DownloadRequest::GetBlockHeaders { request, response, priority })?;
rx.await?.map(WithPeerId::transform)
if self
.request_tx
.send(DownloadRequest::GetBlockHeaders { request, response, priority })
.is_ok()
{
Box::pin(FlattenedResponse::from(rx).map(|r| r.map(WithPeerId::transform)))
} else {
Box::pin(future::err(RequestError::ChannelClosed))
}
}
}
#[async_trait::async_trait]
impl BodiesClient for FetchClient {
type Output = BodiesFut;
/// Sends a `GetBlockBodies` request to an available peer.
async fn get_block_bodies_with_priority(
fn get_block_bodies_with_priority(
&self,
request: Vec<H256>,
priority: Priority,
) -> PeerRequestResult<Vec<BlockBody>> {
) -> Self::Output {
let (response, rx) = oneshot::channel();
self.request_tx.send(DownloadRequest::GetBlockBodies { request, response, priority })?;
rx.await?
if self
.request_tx
.send(DownloadRequest::GetBlockBodies { request, response, priority })
.is_ok()
{
Box::pin(FlattenedResponse::from(rx))
} else {
Box::pin(future::err(RequestError::ChannelClosed))
}
}
}

View File

@ -0,0 +1,37 @@
use futures::Future;
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::oneshot::{error::RecvError, Receiver};
/// Flattern a [Receiver] message in order to get rid of the [RecvError] result
#[derive(Debug)]
#[pin_project]
pub struct FlattenedResponse<T> {
#[pin]
receiver: Receiver<T>,
}
impl<T, E> Future for FlattenedResponse<Result<T, E>>
where
E: From<RecvError>,
{
type Output = Result<T, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.receiver.poll(cx).map(|r| match r {
Ok(r) => r,
Err(err) => Err(err.into()),
})
}
}
impl<T> From<Receiver<T>> for FlattenedResponse<T> {
fn from(value: Receiver<T>) -> Self {
Self { receiver: value }
}
}

View File

@ -124,6 +124,7 @@ mod discovery;
pub mod error;
pub mod eth_requests;
mod fetch;
mod flattened_response;
mod import;
mod listener;
mod manager;

View File

@ -409,12 +409,12 @@ mod tests {
consensus::Consensus,
p2p::{
bodies::{
client::BodiesClient,
client::{BodiesClient, BodiesFut},
downloader::{BodyDownloader, BodyDownloaderResult},
response::BlockResponse,
},
download::DownloadClient,
error::{DownloadResult, PeerRequestResult},
error::DownloadResult,
priority::Priority,
},
test_utils::{
@ -679,13 +679,14 @@ mod tests {
}
}
#[async_trait::async_trait]
impl BodiesClient for NoopClient {
async fn get_block_bodies_with_priority(
type Output = BodiesFut;
fn get_block_bodies_with_priority(
&self,
_hashes: Vec<H256>,
_priority: Priority,
) -> PeerRequestResult<Vec<BlockBody>> {
) -> Self::Output {
panic!("Noop client should not be called")
}
}