mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Always accept incoming connections from trusted peers (#7140)
This commit is contained in:
@ -213,8 +213,7 @@ impl PeersManager {
|
||||
|
||||
/// Invoked when a new _incoming_ tcp connection is accepted.
|
||||
///
|
||||
/// returns an error if the inbound ip address is on the ban list or
|
||||
/// we have reached our limit for max inbound connections
|
||||
/// returns an error if the inbound ip address is on the ban list
|
||||
pub(crate) fn on_incoming_pending_session(
|
||||
&mut self,
|
||||
addr: IpAddr,
|
||||
@ -222,10 +221,6 @@ impl PeersManager {
|
||||
if self.ban_list.is_banned_ip(&addr) {
|
||||
return Err(InboundConnectionError::IpBanned)
|
||||
}
|
||||
if !self.connection_info.has_in_capacity() {
|
||||
return Err(InboundConnectionError::ExceedsLimit(self.connection_info.max_inbound))
|
||||
}
|
||||
// keep track of new connection
|
||||
self.connection_info.inc_in();
|
||||
Ok(())
|
||||
}
|
||||
@ -284,6 +279,14 @@ impl PeersManager {
|
||||
return
|
||||
}
|
||||
value.state = PeerConnectionState::In;
|
||||
// if a peer is not trusted and we don't have capacity for more inbound connections,
|
||||
// disconnecting the peer
|
||||
if !value.is_trusted() && !self.connection_info.has_in_capacity() {
|
||||
self.queued_actions.push_back(PeerAction::Disconnect {
|
||||
peer_id,
|
||||
reason: Some(DisconnectReason::TooManyPeers),
|
||||
});
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
// peer is missing in the table, we add it but mark it as to be removed after
|
||||
@ -292,6 +295,14 @@ impl PeersManager {
|
||||
peer.remove_after_disconnect = true;
|
||||
entry.insert(peer);
|
||||
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
|
||||
|
||||
// disconnect the peer if we don't have capacity for more inbound connections
|
||||
if !self.connection_info.has_in_capacity() {
|
||||
self.queued_actions.push_back(PeerAction::Disconnect {
|
||||
peer_id,
|
||||
reason: Some(DisconnectReason::TooManyPeers),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -573,7 +584,10 @@ impl PeersManager {
|
||||
/// to us at the same time and this connection is already established.
|
||||
pub(crate) fn on_already_connected(&mut self, direction: Direction) {
|
||||
match direction {
|
||||
Direction::Incoming => {}
|
||||
Direction::Incoming => {
|
||||
// need to decrement the ingoing counter
|
||||
self.connection_info.decr_in();
|
||||
}
|
||||
Direction::Outgoing(_) => {
|
||||
// need to decrement the outgoing counter
|
||||
self.connection_info.decr_out();
|
||||
@ -875,7 +889,7 @@ impl ConnectionInfo {
|
||||
|
||||
/// Returns `true` if there's still capacity for a new incoming connection.
|
||||
fn has_in_capacity(&self) -> bool {
|
||||
self.num_inbound < self.max_inbound
|
||||
self.num_inbound <= self.max_inbound
|
||||
}
|
||||
|
||||
fn decr_state(&mut self, state: PeerConnectionState) {
|
||||
@ -1420,7 +1434,6 @@ impl Default for PeerBackoffDurations {
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum InboundConnectionError {
|
||||
ExceedsLimit(usize),
|
||||
IpBanned,
|
||||
}
|
||||
|
||||
@ -1449,7 +1462,7 @@ mod tests {
|
||||
DisconnectReason,
|
||||
};
|
||||
use reth_net_common::ban_list::BanList;
|
||||
use reth_network_api::ReputationChangeKind;
|
||||
use reth_network_api::{Direction, ReputationChangeKind};
|
||||
use reth_primitives::{PeerId, B512};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
@ -1989,6 +2002,28 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_already_connected() {
|
||||
let peer = PeerId::random();
|
||||
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
|
||||
let mut peers = PeersManager::default();
|
||||
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
|
||||
peers.on_incoming_session_established(peer, socket_addr);
|
||||
|
||||
// peer should have been added and num_inbound should have been increased
|
||||
let p = peers.peers.get_mut(&peer).expect("peer not found");
|
||||
assert_eq!(p.addr, socket_addr);
|
||||
assert_eq!(peers.connection_info.num_inbound, 1);
|
||||
|
||||
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
|
||||
peers.on_already_connected(Direction::Incoming);
|
||||
|
||||
// peer should not be connected and num_inbound should not have been increased
|
||||
let p = peers.peers.get_mut(&peer).expect("peer not found");
|
||||
assert_eq!(p.addr, socket_addr);
|
||||
assert_eq!(peers.connection_info.num_inbound, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_reputation_change_trusted_peer() {
|
||||
let peer = PeerId::random();
|
||||
@ -2166,9 +2201,6 @@ mod tests {
|
||||
Ok(_) => panic!(),
|
||||
Err(err) => match err {
|
||||
super::InboundConnectionError::IpBanned {} => {}
|
||||
super::InboundConnectionError::ExceedsLimit { .. } => {
|
||||
panic!()
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -53,9 +53,6 @@ pub use handle::{
|
||||
use reth_eth_wire::multiplex::RlpxProtocolMultiplexer;
|
||||
pub use reth_network_api::{Direction, PeerInfo};
|
||||
|
||||
/// Maximum allowed graceful disconnects at a time.
|
||||
const MAX_GRACEFUL_DISCONNECTS: usize = 15;
|
||||
|
||||
/// Internal identifier for active sessions.
|
||||
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
|
||||
pub struct SessionId(usize);
|
||||
@ -113,8 +110,6 @@ pub struct SessionManager {
|
||||
bandwidth_meter: BandwidthMeter,
|
||||
/// Metrics for the session manager.
|
||||
metrics: SessionManagerMetrics,
|
||||
/// Tracks the number of active graceful disconnects for incoming connections.
|
||||
graceful_disconnects_counter: GracefulDisconnects,
|
||||
}
|
||||
|
||||
// === impl SessionManager ===
|
||||
@ -156,7 +151,6 @@ impl SessionManager {
|
||||
bandwidth_meter,
|
||||
extra_protocols,
|
||||
metrics: Default::default(),
|
||||
graceful_disconnects_counter: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -310,27 +304,6 @@ impl SessionManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a disconnect message to the peer with the given [DisconnectReason].
|
||||
pub(crate) fn disconnect_incoming_connection(
|
||||
&mut self,
|
||||
stream: TcpStream,
|
||||
reason: DisconnectReason,
|
||||
) {
|
||||
let counter = self.graceful_disconnects_counter.clone();
|
||||
if counter.exceeds_limit() {
|
||||
// simply drop the connection if there are too many active disconnects already
|
||||
return
|
||||
}
|
||||
let secret_key = self.secret_key;
|
||||
|
||||
self.spawn(async move {
|
||||
if let Ok(stream) = get_eciess_stream(stream, secret_key, Direction::Incoming).await {
|
||||
let _ = UnauthedP2PStream::new(stream).send_disconnect(reason).await;
|
||||
}
|
||||
drop(counter)
|
||||
});
|
||||
}
|
||||
|
||||
/// Initiates a shutdown of all sessions.
|
||||
///
|
||||
/// It will trigger the disconnect on all the session tasks to gracefully terminate. The result
|
||||
@ -635,18 +608,6 @@ impl SessionManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Keep track of graceful disconnects for incoming connections.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct GracefulDisconnects(Arc<()>);
|
||||
|
||||
impl GracefulDisconnects {
|
||||
/// Returns true if the number of graceful disconnects exceeds the limit
|
||||
/// [MAX_GRACEFUL_DISCONNECTS]
|
||||
fn exceeds_limit(&self) -> bool {
|
||||
Arc::strong_count(&self.0) > MAX_GRACEFUL_DISCONNECTS
|
||||
}
|
||||
}
|
||||
|
||||
/// Events produced by the [`SessionManager`]
|
||||
#[derive(Debug)]
|
||||
pub enum SessionEvent {
|
||||
|
||||
@ -10,7 +10,7 @@ use futures::Stream;
|
||||
use reth_eth_wire::{
|
||||
capability::{Capabilities, CapabilityMessage},
|
||||
errors::EthStreamError,
|
||||
DisconnectReason, EthVersion, Status,
|
||||
EthVersion, Status,
|
||||
};
|
||||
use reth_primitives::PeerId;
|
||||
use reth_provider::{BlockNumReader, BlockReader};
|
||||
@ -29,7 +29,7 @@ use tracing::trace;
|
||||
///
|
||||
/// A swarm emits [`SwarmEvent`]s when polled.
|
||||
///
|
||||
/// The manages the [`ConnectionListener`] and delegates new incoming connections to the
|
||||
/// It manages the [`ConnectionListener`] and delegates new incoming connections to the
|
||||
/// [`SessionManager`]. Outgoing connections are either initiated on demand or triggered by the
|
||||
/// [`NetworkState`] and also delegated to the [`NetworkState`].
|
||||
///
|
||||
@ -203,13 +203,6 @@ where
|
||||
InboundConnectionError::IpBanned => {
|
||||
trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
|
||||
}
|
||||
InboundConnectionError::ExceedsLimit(limit) => {
|
||||
trace!(target: "net", %limit, ?remote_addr, "Exceeded incoming connection limit; disconnecting");
|
||||
self.sessions.disconnect_incoming_connection(
|
||||
stream,
|
||||
DisconnectReason::TooManyPeers,
|
||||
);
|
||||
}
|
||||
}
|
||||
return None
|
||||
}
|
||||
|
||||
@ -585,6 +585,123 @@ async fn test_disconnect_incoming_when_exceeded_incoming_connections() {
|
||||
net_handle.terminate().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_always_accept_incoming_connections_from_trusted_peers() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let other_peer1 = new_random_peer(10, HashSet::new()).await;
|
||||
let other_peer2 = new_random_peer(10, HashSet::new()).await;
|
||||
let other_peer3 = new_random_peer(0, HashSet::new()).await;
|
||||
|
||||
// setup the peer with max_inbound = 1, and add other_peer_3 as trust nodes
|
||||
let peer = new_random_peer(
|
||||
1,
|
||||
HashSet::from([NodeRecord::new(other_peer3.local_addr(), *other_peer3.peer_id())]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let handle = peer.handle().clone();
|
||||
let other_peer_handle1 = other_peer1.handle().clone();
|
||||
let other_peer_handle2 = other_peer2.handle().clone();
|
||||
let other_peer_handle3 = other_peer3.handle().clone();
|
||||
|
||||
tokio::task::spawn(peer);
|
||||
tokio::task::spawn(other_peer1);
|
||||
tokio::task::spawn(other_peer2);
|
||||
tokio::task::spawn(other_peer3);
|
||||
|
||||
let mut events = NetworkEventStream::new(handle.event_listener());
|
||||
let mut events2 = NetworkEventStream::new(other_peer_handle2.event_listener());
|
||||
|
||||
// though we added other_peer3 as a trust node, the incoming connection should fail because
|
||||
// peer3 doesn't allow inbound connections
|
||||
let (peer_id, reason) = events.next_session_closed().await.unwrap();
|
||||
assert_eq!(peer_id, *other_peer_handle3.peer_id());
|
||||
assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
|
||||
|
||||
// incoming connection should succeed
|
||||
other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr());
|
||||
let peer_id = events.next_session_established().await.unwrap();
|
||||
assert_eq!(peer_id, *other_peer_handle1.peer_id());
|
||||
assert_eq!(handle.num_connected_peers(), 1);
|
||||
|
||||
// incoming connection should fail because exceeding max_inbound
|
||||
other_peer_handle2.add_peer(*handle.peer_id(), handle.local_addr());
|
||||
let (peer_id, reason) = events.next_session_closed().await.unwrap();
|
||||
assert_eq!(peer_id, *other_peer_handle2.peer_id());
|
||||
// fixme: this should be `Some(DisconnectReason::TooManyPeers)` but `None`
|
||||
assert_eq!(reason, None);
|
||||
|
||||
let (peer_id, reason) = events2.next_session_closed().await.unwrap();
|
||||
assert_eq!(peer_id, *handle.peer_id());
|
||||
assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
|
||||
|
||||
// outbound connection from `other_peer3` should succeed
|
||||
other_peer_handle3.add_peer(*handle.peer_id(), handle.local_addr());
|
||||
let peer_id = events.next_session_established().await.unwrap();
|
||||
assert_eq!(peer_id, *other_peer_handle3.peer_id());
|
||||
|
||||
// sleep is needed because the disconnect event happened after session_established event
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
assert_eq!(handle.num_connected_peers(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_rejected_by_already_connect() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let other_peer1 = new_random_peer(10, HashSet::new()).await;
|
||||
let other_peer2 = new_random_peer(10, HashSet::new()).await;
|
||||
|
||||
// setup the peer with max_inbound = 2
|
||||
let peer = new_random_peer(2, HashSet::new()).await;
|
||||
|
||||
let handle = peer.handle().clone();
|
||||
let other_peer_handle1 = other_peer1.handle().clone();
|
||||
let other_peer_handle2 = other_peer2.handle().clone();
|
||||
|
||||
tokio::task::spawn(peer);
|
||||
tokio::task::spawn(other_peer1);
|
||||
tokio::task::spawn(other_peer2);
|
||||
|
||||
let mut events = NetworkEventStream::new(handle.event_listener());
|
||||
|
||||
// incoming connection should succeed
|
||||
other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr());
|
||||
let peer_id = events.next_session_established().await.unwrap();
|
||||
assert_eq!(peer_id, *other_peer_handle1.peer_id());
|
||||
assert_eq!(handle.num_connected_peers(), 1);
|
||||
|
||||
// incoming connection from the same peer should be rejected by already connected
|
||||
// and num_inbount should still be 1
|
||||
other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr());
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
// incoming connection from other_peer2 should succeed
|
||||
other_peer_handle2.add_peer(*handle.peer_id(), handle.local_addr());
|
||||
let peer_id = events.next_session_established().await.unwrap();
|
||||
assert_eq!(peer_id, *other_peer_handle2.peer_id());
|
||||
|
||||
// wait 2 seconds and check that other_peer2 is not rejected by TooManyPeers
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
assert_eq!(handle.num_connected_peers(), 2);
|
||||
}
|
||||
|
||||
async fn new_random_peer(
|
||||
max_in_bound: usize,
|
||||
trusted_nodes: HashSet<NodeRecord>,
|
||||
) -> NetworkManager<NoopProvider> {
|
||||
let secret_key = SecretKey::new(&mut rand::thread_rng());
|
||||
let peers_config =
|
||||
PeersConfig::default().with_max_inbound(max_in_bound).with_trusted_nodes(trusted_nodes);
|
||||
|
||||
let config = NetworkConfigBuilder::new(secret_key)
|
||||
.listener_port(0)
|
||||
.disable_discovery()
|
||||
.peer_config(peers_config)
|
||||
.build(NoopProvider::default());
|
||||
|
||||
NetworkManager::new(config).await.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_connect_many() {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
Reference in New Issue
Block a user