perf(net): use qualified types for headers client future (#1115)

This commit is contained in:
Matthias Seitz
2023-02-01 13:12:48 +01:00
committed by GitHub
parent 0ea1360cd6
commit 28cb91c6b4
6 changed files with 29 additions and 26 deletions

View File

@ -168,14 +168,14 @@ impl Command {
let (_, response) = client.get_headers(request).await?.split();
if response.0.len() != 1 {
if response.len() != 1 {
eyre::bail!(
"Invalid number of headers received. Expected: 1. Received: {}",
response.0.len()
response.len()
)
}
let header = response.0.into_iter().next().unwrap().seal();
let header = response.into_iter().next().unwrap().seal();
let valid = match id {
BlockHashOrNumber::Hash(hash) => header.hash() == hash,

View File

@ -1,7 +1,7 @@
use crate::p2p::{download::DownloadClient, error::PeerRequestResult, priority::Priority};
use futures::Future;
pub use reth_eth_wire::BlockHeaders;
use reth_primitives::{BlockHashOrNumber, HeadersDirection, H256, U256};
use reth_primitives::{BlockHashOrNumber, Header, HeadersDirection, H256, U256};
use std::{fmt::Debug, pin::Pin};
/// The header request struct to be sent to connected peers, which
@ -17,13 +17,13 @@ pub struct HeadersRequest {
}
/// The headers future type
pub type HeadersFut = Pin<Box<dyn Future<Output = PeerRequestResult<BlockHeaders>> + Send + Sync>>;
pub type HeadersFut = Pin<Box<dyn Future<Output = PeerRequestResult<Vec<Header>>> + Send + Sync>>;
/// The block headers downloader client
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeadersClient: DownloadClient {
/// The headers type
type Output: Future<Output = PeerRequestResult<BlockHeaders>> + Sync + Send + Unpin;
type Output: Future<Output = PeerRequestResult<Vec<Header>>> + Sync + Send + Unpin;
/// Sends the header request to the p2p network and returns the header response received from a
/// peer.
@ -31,7 +31,7 @@ pub trait HeadersClient: DownloadClient {
self.get_headers_with_priority(request, Priority::Normal)
}
/// Sends the header request to the p2p network with priroity set and returns the header
/// Sends the header request to the p2p network with priority set and returns the header
/// response received from a peer.
fn get_headers_with_priority(
&self,

View File

@ -104,7 +104,7 @@ impl Stream for TestHeaderDownloader {
}
}
type TestHeadersFut = Pin<Box<dyn Future<Output = PeerRequestResult<BlockHeaders>> + Sync + Send>>;
type TestHeadersFut = Pin<Box<dyn Future<Output = PeerRequestResult<Vec<Header>>> + Sync + Send>>;
struct TestDownload {
client: Arc<TestHeadersClient>,
@ -168,7 +168,7 @@ impl Stream for TestDownload {
Ok(resp) => {
// Skip head and seal headers
let mut headers =
resp.1 .0.into_iter().skip(1).map(|h| h.seal()).collect::<Vec<_>>();
resp.1.into_iter().skip(1).map(|h| h.seal()).collect::<Vec<_>>();
headers.sort_unstable_by_key(|h| h.number);
headers.into_iter().for_each(|h| this.buffer.push(h));
this.done = true;
@ -250,7 +250,7 @@ impl HeadersClient for TestHeadersClient {
let mut lock = responses.lock().await;
let len = lock.len().min(request.limit as usize);
let resp = lock.drain(..len).collect();
let with_peer_id = WithPeerId::from((PeerId::default(), BlockHeaders(resp)));
let with_peer_id = WithPeerId::from((PeerId::default(), resp));
Ok(with_peer_id)
})
}

View File

@ -8,7 +8,7 @@ use reth_interfaces::{
p2p::{
error::{DownloadError, DownloadResult, PeerRequestResult},
headers::{
client::{BlockHeaders, HeadersClient, HeadersRequest},
client::{HeadersClient, HeadersRequest},
downloader::{validate_header_download, HeaderDownloader, SyncTarget},
},
priority::Priority,
@ -279,8 +279,7 @@ where
let HeadersRequestOutcome { request, outcome } = response;
match outcome {
Ok(res) => {
let (peer_id, headers) = res.split();
let mut headers = headers.0;
let (peer_id, mut headers) = res.split();
// update total downloaded metric
self.metrics.total_downloaded.increment(headers.len() as u64);
@ -337,8 +336,7 @@ where
match outcome {
Ok(res) => {
let (peer_id, headers) = res.split();
let mut headers = headers.0;
let (peer_id, mut headers) = res.split();
// update total downloaded metric
self.metrics.total_downloaded.increment(headers.len() as u64);
@ -693,7 +691,7 @@ where
}
}
/// A future that returns a list of [`BlockHeaders`] on success.
/// A future that returns a list of [`Header`] on success.
struct HeadersRequestFuture<F> {
request: Option<HeadersRequest>,
fut: F,
@ -701,7 +699,7 @@ struct HeadersRequestFuture<F> {
impl<F> Future for HeadersRequestFuture<F>
where
F: Future<Output = PeerRequestResult<BlockHeaders>> + Sync + Send + Unpin,
F: Future<Output = PeerRequestResult<Vec<Header>>> + Sync + Send + Unpin,
{
type Output = HeadersRequestOutcome;
@ -717,7 +715,7 @@ where
/// The outcome of the [HeadersRequestFuture]
struct HeadersRequestOutcome {
request: HeadersRequest,
outcome: PeerRequestResult<BlockHeaders>,
outcome: PeerRequestResult<Vec<Header>>,
}
// === impl OrderedHeadersResponse ===

View File

@ -1,16 +1,17 @@
//! A client implementation that can interact with the network and download data.
use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse, peers::PeersHandle};
use futures::{future, FutureExt};
use futures::{future, future::Either};
use reth_interfaces::p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
error::RequestError,
headers::client::{HeadersClient, HeadersFut, HeadersRequest},
error::{PeerRequestResult, RequestError},
headers::client::{HeadersClient, HeadersRequest},
priority::Priority,
};
use reth_network_api::ReputationChangeKind;
use reth_primitives::{PeerId, WithPeerId, H256};
use reth_primitives::{Header, PeerId, H256};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
@ -78,8 +79,12 @@ impl DownloadClient for FetchClient {
}
}
// The `Output` future of the [HeadersClient] impl of [FetchClient] that either returns a response
// or an error.
type HeadersClientFuture<T> = Either<FlattenedResponse<T>, future::Ready<T>>;
impl HeadersClient for FetchClient {
type Output = HeadersFut;
type Output = HeadersClientFuture<PeerRequestResult<Vec<Header>>>;
/// Sends a `GetBlockHeaders` request to an available peer.
fn get_headers_with_priority(
@ -93,9 +98,9 @@ impl HeadersClient for FetchClient {
.send(DownloadRequest::GetBlockHeaders { request, response, priority })
.is_ok()
{
Box::pin(FlattenedResponse::from(rx).map(|r| r.map(WithPeerId::transform)))
Either::Left(FlattenedResponse::from(rx))
} else {
Box::pin(future::err(RequestError::ChannelClosed))
Either::Right(future::err(RequestError::ChannelClosed))
}
}
}

View File

@ -115,7 +115,7 @@ async fn test_get_header() {
let res = fetch0.get_headers(req).await;
assert!(res.is_ok(), "{res:?}");
let headers = res.unwrap().1 .0;
let headers = res.unwrap().1;
assert_eq!(headers.len(), 1);
assert_eq!(headers[0], header);
}