feat(net): integrate num active peers in downloader (#900)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
Matthias Seitz
2023-01-17 10:31:52 +01:00
committed by GitHub
parent 06be083335
commit 79fad91ca0
9 changed files with 53 additions and 10 deletions

View File

@ -13,6 +13,9 @@ pub trait DownloadClient: Send + Sync + Debug {
/// Penalize the peer for responding with a message
/// that violates validation rules
fn report_bad_message(&self, peer_id: PeerId);
/// Returns how many peers the network is currently connected to.
fn num_connected_peers(&self) -> usize;
}
/// The generic trait for requesting and verifying data

View File

@ -22,6 +22,10 @@ impl<F: Sync + Send> DownloadClient for TestBodiesClient<F> {
fn report_bad_message(&self, _peer_id: reth_primitives::PeerId) {
// noop
}
fn num_connected_peers(&self) -> usize {
0
}
}
#[async_trait]

View File

@ -179,6 +179,10 @@ impl DownloadClient for TestHeadersClient {
fn report_bad_message(&self, _peer_id: PeerId) {
// noop
}
fn num_connected_peers(&self) -> usize {
0
}
}
#[async_trait::async_trait]

View File

@ -57,6 +57,10 @@ impl<F: Send + Sync> DownloadClient for TestBodiesClient<F> {
fn report_bad_message(&self, _peer_id: PeerId) {
// noop
}
fn num_connected_peers(&self) -> usize {
0
}
}
#[async_trait]

View File

@ -12,6 +12,10 @@ use reth_interfaces::p2p::{
headers::client::{HeadersClient, HeadersRequest},
};
use reth_primitives::{PeerId, WithPeerId, H256};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
/// Front-end API for fetching data from the network.
@ -58,12 +62,18 @@ pub struct FetchClient {
pub(crate) request_tx: UnboundedSender<DownloadRequest>,
/// The handle to the peers
pub(crate) peers_handle: PeersHandle,
/// Number of active peer sessions the node's currently handling.
pub(crate) num_active_peers: Arc<AtomicUsize>,
}
impl DownloadClient for FetchClient {
fn report_bad_message(&self, peer_id: PeerId) {
self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage);
}
fn num_connected_peers(&self) -> usize {
self.num_active_peers.load(Ordering::Relaxed)
}
}
#[async_trait::async_trait]

View File

@ -11,7 +11,7 @@ use reth_primitives::{Header, PeerId, H256};
use std::{
collections::{HashMap, VecDeque},
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
@ -40,6 +40,8 @@ pub struct StateFetcher {
peers: HashMap<PeerId, Peer>,
/// The handle to the peers manager
peers_handle: PeersHandle,
/// Number of active peer sessions the node's currently handling.
num_active_peers: Arc<AtomicUsize>,
/// Requests queued for processing
queued_requests: VecDeque<DownloadRequest>,
/// Receiver for new incoming download requests
@ -51,13 +53,14 @@ pub struct StateFetcher {
// === impl StateSyncer ===
impl StateFetcher {
pub(crate) fn new(peers_handle: PeersHandle) -> Self {
pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
Self {
inflight_headers_requests: Default::default(),
inflight_bodies_requests: Default::default(),
peers: Default::default(),
peers_handle,
num_active_peers,
queued_requests: Default::default(),
download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
download_requests_tx,
@ -261,6 +264,7 @@ impl StateFetcher {
FetchClient {
request_tx: self.download_requests_tx.clone(),
peers_handle: self.peers_handle.clone(),
num_active_peers: Arc::clone(&self.num_active_peers),
}
}
}
@ -393,7 +397,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_poll_fetcher() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher = StateFetcher::new(manager.handle());
let mut fetcher = StateFetcher::new(manager.handle(), Default::default());
poll_fn(move |cx| {
assert!(fetcher.poll(cx).is_pending());
@ -411,7 +415,7 @@ mod tests {
#[tokio::test]
async fn test_peer_rotation() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher = StateFetcher::new(manager.handle());
let mut fetcher = StateFetcher::new(manager.handle(), Default::default());
// Add a few random peers
let peer1 = H512::random();
let peer2 = H512::random();
@ -434,7 +438,7 @@ mod tests {
#[tokio::test]
async fn test_peer_prioritization() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher = StateFetcher::new(manager.handle());
let mut fetcher = StateFetcher::new(manager.handle(), Default::default());
// Add a few random peers
let peer1 = H512::random();
let peer2 = H512::random();

View File

@ -184,6 +184,7 @@ where
// need to retrieve the addr here since provided port could be `0`
let local_peer_id = discovery.local_id();
let num_active_peers = Arc::new(AtomicUsize::new(0));
let bandwidth_meter: BandwidthMeter = BandwidthMeter::default();
let sessions = SessionManager::new(
@ -195,13 +196,18 @@ where
fork_filter,
bandwidth_meter.clone(),
);
let state = NetworkState::new(client, discovery, peers_manager, genesis_hash);
let state = NetworkState::new(
client,
discovery,
peers_manager,
genesis_hash,
Arc::clone(&num_active_peers),
);
let swarm = Swarm::new(incoming, sessions, state);
let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
let num_active_peers = Arc::new(AtomicUsize::new(0));
let handle = NetworkHandle::new(
Arc::clone(&num_active_peers),
listener_address,

View File

@ -20,7 +20,10 @@ use std::{
collections::{HashMap, VecDeque},
net::{IpAddr, SocketAddr},
num::NonZeroUsize,
sync::{atomic::AtomicU64, Arc},
sync::{
atomic::{AtomicU64, AtomicUsize},
Arc,
},
task::{Context, Poll},
};
use tokio::sync::oneshot;
@ -69,8 +72,9 @@ where
discovery: Discovery,
peers_manager: PeersManager,
genesis_hash: H256,
num_active_peers: Arc<AtomicUsize>,
) -> Self {
let state_fetcher = StateFetcher::new(peers_manager.handle());
let state_fetcher = StateFetcher::new(peers_manager.handle(), num_active_peers);
Self {
active_peers: Default::default(),
peers_manager,
@ -525,7 +529,7 @@ mod tests {
client: Arc::new(NoopProvider::default()),
discovery: Discovery::noop(),
genesis_hash: Default::default(),
state_fetcher: StateFetcher::new(handle),
state_fetcher: StateFetcher::new(handle, Default::default()),
}
}

View File

@ -775,6 +775,10 @@ mod tests {
fn report_bad_message(&self, _: reth_primitives::PeerId) {
panic!("Noop client should not be called")
}
fn num_connected_peers(&self) -> usize {
panic!("Noop client should not be called")
}
}
#[async_trait::async_trait]