refactor: deprecate downloader trait (#959)

This commit is contained in:
Matthias Seitz
2023-01-24 13:38:11 +01:00
committed by GitHub
parent 167aa60ed2
commit a24048a237
14 changed files with 43 additions and 128 deletions

View File

@ -1,4 +1,4 @@
use crate::p2p::{downloader::DownloadClient, error::PeerRequestResult, priority::Priority};
use crate::p2p::{download::DownloadClient, error::PeerRequestResult, priority::Priority};
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_primitives::H256;

View File

@ -1,5 +1,5 @@
use super::response::BlockResponse;
use crate::{db, p2p::downloader::Downloader};
use crate::db;
use futures::Stream;
use reth_primitives::BlockNumber;
use std::ops::Range;
@ -8,7 +8,7 @@ use std::ops::Range;
///
/// A downloader represents a distinct strategy for submitting requests to download block bodies,
/// while a [BodiesClient] represents a client capable of fulfilling these requests.
pub trait BodyDownloader: Downloader + Stream<Item = Vec<BlockResponse>> + Unpin {
pub trait BodyDownloader: Send + Sync + Stream<Item = Vec<BlockResponse>> + Unpin {
/// Method for setting the download range.
fn set_download_range(&mut self, range: Range<BlockNumber>) -> Result<(), db::Error>;
}

View File

@ -0,0 +1,12 @@
use reth_primitives::PeerId;
use std::fmt::Debug;
/// Generic download client for peer penalization
pub trait DownloadClient: Send + Sync + Debug {
/// Penalize the peer for responding with a message
/// that violates validation rules
fn report_bad_message(&self, peer_id: PeerId);
/// Returns how many peers the network is currently connected to.
fn num_connected_peers(&self) -> usize;
}

View File

@ -1,34 +0,0 @@
use super::error::DownloadResult;
use crate::consensus::Consensus;
use futures::Stream;
use reth_primitives::PeerId;
use std::{fmt::Debug, pin::Pin};
/// A stream for downloading response.
pub type DownloadStream<'a, T> = Pin<Box<dyn Stream<Item = DownloadResult<T>> + Send + 'a>>;
/// Generic download client for peer penalization
pub trait DownloadClient: Send + Sync + Debug {
/// Penalize the peer for responding with a message
/// that violates validation rules
fn report_bad_message(&self, peer_id: PeerId);
/// Returns how many peers the network is currently connected to.
fn num_connected_peers(&self) -> usize;
}
/// The generic trait for requesting and verifying data
/// over p2p network client
pub trait Downloader: Send + Sync {
/// The client used to fetch necessary data
type Client: DownloadClient;
/// The Consensus used to verify data validity when downloading
type Consensus: Consensus;
/// The headers client
fn client(&self) -> &Self::Client;
/// The consensus engine
fn consensus(&self) -> &Self::Consensus;
}

View File

@ -1,4 +1,4 @@
use crate::p2p::{downloader::DownloadClient, error::PeerRequestResult, priority::Priority};
use crate::p2p::{download::DownloadClient, error::PeerRequestResult, priority::Priority};
use async_trait::async_trait;
pub use reth_eth_wire::BlockHeaders;
use reth_primitives::{BlockHashOrNumber, HeadersDirection, H256, U256};

View File

@ -1,9 +1,6 @@
use crate::{
consensus::Consensus,
p2p::{
downloader::Downloader,
error::{DownloadError, DownloadResult},
},
p2p::error::{DownloadError, DownloadResult},
};
use futures::Stream;
use reth_primitives::{SealedHeader, H256};
@ -14,7 +11,7 @@ use reth_primitives::{SealedHeader, H256};
/// while a [HeadersClient] represents a client capable of fulfilling these requests.
///
/// A [HeaderDownloader] is a [Stream] that returns batches for headers.
pub trait HeaderDownloader: Downloader + Stream<Item = Vec<SealedHeader>> + Unpin {
pub trait HeaderDownloader: Send + Sync + Stream<Item = Vec<SealedHeader>> + Unpin {
/// Updates the gap to sync which ranges from local head to the sync target
///
/// See also [HeaderDownloader::update_sync_target] and [HeaderDownloader::update_local_head]
@ -33,10 +30,7 @@ pub trait HeaderDownloader: Downloader + Stream<Item = Vec<SealedHeader>> + Unpi
fn set_batch_size(&mut self, limit: usize);
/// Validate whether the header is valid in relation to it's parent
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
validate_header_download(self.consensus(), header, parent)?;
Ok(())
}
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()>;
}
/// Specifies the target to sync for [HeaderDownloader::update_sync_target]

View File

@ -1,5 +1,5 @@
/// The generic downloader trait for implementing data downloaders over P2P.
pub mod downloader;
/// Shared abstractions for downloader implementations.
pub mod download;
/// Traits for implementing P2P block body clients.
pub mod bodies;

View File

@ -1,5 +1,5 @@
use crate::p2p::{
bodies::client::BodiesClient, downloader::DownloadClient, error::PeerRequestResult,
bodies::client::BodiesClient, download::DownloadClient, error::PeerRequestResult,
priority::Priority,
};
use async_trait::async_trait;

View File

@ -2,11 +2,11 @@
use crate::{
consensus::{self, Consensus},
p2p::{
downloader::{DownloadClient, Downloader},
download::DownloadClient,
error::{DownloadError, DownloadResult, PeerRequestResult, RequestError},
headers::{
client::{HeadersClient, HeadersRequest, StatusUpdater},
downloader::{HeaderDownloader, SyncTarget},
downloader::{validate_header_download, HeaderDownloader, SyncTarget},
},
priority::Priority,
},
@ -62,19 +62,6 @@ impl TestHeaderDownloader {
}
}
impl Downloader for TestHeaderDownloader {
type Client = TestHeadersClient;
type Consensus = TestConsensus;
fn client(&self) -> &Self::Client {
&self.client
}
fn consensus(&self) -> &Self::Consensus {
&self.consensus
}
}
impl HeaderDownloader for TestHeaderDownloader {
fn update_local_head(&mut self, _head: SealedHeader) {}
@ -83,6 +70,11 @@ impl HeaderDownloader for TestHeaderDownloader {
fn set_batch_size(&mut self, limit: usize) {
self.batch_size = limit;
}
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
validate_header_download(&self.consensus, header, parent)?;
Ok(())
}
}
impl Stream for TestHeaderDownloader {

View File

@ -5,14 +5,10 @@ use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}
use reth_interfaces::{
consensus::Consensus as ConsensusTrait,
db,
p2p::{
bodies::{client::BodiesClient, downloader::BodyDownloader, response::BlockResponse},
downloader::Downloader,
},
p2p::bodies::{client::BodiesClient, downloader::BodyDownloader, response::BlockResponse},
};
use reth_primitives::{BlockNumber, SealedHeader};
use std::{
borrow::Borrow,
cmp::Ordering,
collections::BinaryHeap,
ops::{Range, RangeInclusive},
@ -224,24 +220,6 @@ where
}
}
impl<B, C, DB> Downloader for ConcurrentDownloader<B, C, DB>
where
B: BodiesClient,
C: ConsensusTrait,
DB: Database,
{
type Client = B;
type Consensus = C;
fn client(&self) -> &Self::Client {
self.client.borrow()
}
fn consensus(&self) -> &Self::Consensus {
self.consensus.borrow()
}
}
impl<B, C, DB> BodyDownloader for ConcurrentDownloader<B, C, DB>
where
B: BodiesClient + 'static,
@ -563,7 +541,7 @@ mod tests {
let db = create_test_db::<WriteMap>(EnvKind::RW);
let (headers, mut bodies) = generate_bodies(0..20);
insert_headers(db.borrow(), &headers);
insert_headers(&db, &headers);
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
@ -590,7 +568,7 @@ mod tests {
let db = create_test_db::<WriteMap>(EnvKind::RW);
let (headers, mut bodies) = generate_bodies(0..100);
insert_headers(db.borrow(), &headers);
insert_headers(&db, &headers);
let stream_batch_size = 20;
let request_limit = 10;
@ -630,7 +608,7 @@ mod tests {
let db = create_test_db::<WriteMap>(EnvKind::RW);
let (headers, mut bodies) = generate_bodies(0..200);
insert_headers(db.borrow(), &headers);
insert_headers(&db, &headers);
let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
let mut downloader = ConcurrentDownloaderBuilder::default()

View File

@ -5,18 +5,16 @@ use futures_util::{stream::FuturesUnordered, StreamExt};
use reth_interfaces::{
consensus::Consensus,
p2p::{
downloader::Downloader,
error::{DownloadError, PeerRequestResult},
error::{DownloadError, DownloadResult, PeerRequestResult},
headers::{
client::{BlockHeaders, HeadersClient, HeadersRequest},
downloader::{HeaderDownloader, SyncTarget},
downloader::{validate_header_download, HeaderDownloader, SyncTarget},
},
priority::Priority,
},
};
use reth_primitives::{Header, HeadersDirection, PeerId, SealedHeader, H256};
use std::{
borrow::Borrow,
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap},
future::Future,
@ -451,23 +449,6 @@ where
}
}
impl<C, H> Downloader for LinearDownloader<C, H>
where
C: Consensus,
H: HeadersClient,
{
type Client = H;
type Consensus = C;
fn client(&self) -> &Self::Client {
self.client.borrow()
}
fn consensus(&self) -> &Self::Consensus {
self.consensus.borrow()
}
}
impl<C, H> HeaderDownloader for LinearDownloader<C, H>
where
C: Consensus + 'static,
@ -533,6 +514,11 @@ where
fn set_batch_size(&mut self, batch_size: usize) {
self.stream_batch_size = batch_size;
}
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
validate_header_download(&self.consensus, header, parent)?;
Ok(())
}
}
impl<C, H> Stream for LinearDownloader<C, H>

View File

@ -4,7 +4,7 @@ use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_interfaces::{
p2p::{
bodies::client::BodiesClient, downloader::DownloadClient, error::PeerRequestResult,
bodies::client::BodiesClient, download::DownloadClient, error::PeerRequestResult,
priority::Priority,
},
test_utils::generators::random_block_range,

View File

@ -7,7 +7,7 @@ use crate::{
use reth_eth_wire::{BlockBody, BlockHeaders};
use reth_interfaces::p2p::{
bodies::client::BodiesClient,
downloader::DownloadClient,
download::DownloadClient,
error::PeerRequestResult,
headers::client::{HeadersClient, HeadersRequest},
priority::Priority,

View File

@ -413,7 +413,7 @@ mod tests {
bodies::{
client::BodiesClient, downloader::BodyDownloader, response::BlockResponse,
},
downloader::{DownloadClient, Downloader},
download::DownloadClient,
error::PeerRequestResult,
priority::Priority,
},
@ -717,19 +717,6 @@ mod tests {
}
}
impl Downloader for TestBodyDownloader {
type Client = NoopClient;
type Consensus = TestConsensus;
fn client(&self) -> &Self::Client {
unreachable!()
}
fn consensus(&self) -> &Self::Consensus {
unreachable!()
}
}
impl BodyDownloader for TestBodyDownloader {
fn set_download_range(&mut self, range: Range<BlockNumber>) -> Result<(), db::Error> {
self.headers = VecDeque::from(self.db.view(|tx| {