feat(net/peer): add peer with udp socket (#9156)

Signed-off-by: jsvisa <delweng@gmail.com>
This commit is contained in:
Delweng
2024-06-29 17:02:09 +08:00
committed by GitHub
parent 57c4f7e570
commit b93e70c429
10 changed files with 221 additions and 88 deletions

View File

@ -66,9 +66,14 @@ pub trait PeersInfo: Send + Sync {
/// Provides an API for managing the peers of the network. /// Provides an API for managing the peers of the network.
pub trait Peers: PeersInfo { pub trait Peers: PeersInfo {
/// Adds a peer to the peer set. /// Adds a peer to the peer set with UDP `SocketAddr`.
fn add_peer(&self, peer: PeerId, addr: SocketAddr) { fn add_peer(&self, peer: PeerId, tcp_addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Basic, addr); self.add_peer_kind(peer, PeerKind::Basic, tcp_addr, None);
}
/// Adds a peer to the peer set with TCP and UDP `SocketAddr`.
fn add_peer_with_udp(&self, peer: PeerId, tcp_addr: SocketAddr, udp_addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Basic, tcp_addr, Some(udp_addr));
} }
/// Adds a trusted [`PeerId`] to the peer set. /// Adds a trusted [`PeerId`] to the peer set.
@ -76,13 +81,24 @@ pub trait Peers: PeersInfo {
/// This allows marking a peer as trusted without having to know the peer's address. /// This allows marking a peer as trusted without having to know the peer's address.
fn add_trusted_peer_id(&self, peer: PeerId); fn add_trusted_peer_id(&self, peer: PeerId);
/// Adds a trusted peer to the peer set. /// Adds a trusted peer to the peer set with UDP `SocketAddr`.
fn add_trusted_peer(&self, peer: PeerId, addr: SocketAddr) { fn add_trusted_peer(&self, peer: PeerId, tcp_addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Trusted, addr); self.add_peer_kind(peer, PeerKind::Trusted, tcp_addr, None);
}
/// Adds a trusted peer with TCP and UDP `SocketAddr` to the peer set.
fn add_trusted_peer_with_udp(&self, peer: PeerId, tcp_addr: SocketAddr, udp_addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Trusted, tcp_addr, Some(udp_addr));
} }
/// Adds a peer to the known peer set, with the given kind. /// Adds a peer to the known peer set, with the given kind.
fn add_peer_kind(&self, peer: PeerId, kind: PeerKind, addr: SocketAddr); fn add_peer_kind(
&self,
peer: PeerId,
kind: PeerKind,
tcp_addr: SocketAddr,
udp_addr: Option<SocketAddr>,
);
/// Returns the rpc [`PeerInfo`] for all connected [`PeerKind::Trusted`] peers. /// Returns the rpc [`PeerInfo`] for all connected [`PeerKind::Trusted`] peers.
fn get_trusted_peers( fn get_trusted_peers(

View File

@ -71,7 +71,14 @@ impl PeersInfo for NoopNetwork {
impl Peers for NoopNetwork { impl Peers for NoopNetwork {
fn add_trusted_peer_id(&self, _peer: PeerId) {} fn add_trusted_peer_id(&self, _peer: PeerId) {}
fn add_peer_kind(&self, _peer: PeerId, _kind: PeerKind, _addr: SocketAddr) {} fn add_peer_kind(
&self,
_peer: PeerId,
_kind: PeerKind,
_tcp_addr: SocketAddr,
_udp_addr: Option<SocketAddr>,
) {
}
async fn get_peers_by_kind(&self, _kind: PeerKind) -> Result<Vec<PeerInfo>, NetworkError> { async fn get_peers_by_kind(&self, _kind: PeerKind) -> Result<Vec<PeerInfo>, NetworkError> {
Ok(vec![]) Ok(vec![])

View File

@ -4,6 +4,7 @@ use crate::{
cache::LruMap, cache::LruMap,
error::{NetworkError, ServiceKind}, error::{NetworkError, ServiceKind},
manager::DiscoveredEvent, manager::DiscoveredEvent,
peers::PeerAddr,
}; };
use enr::Enr; use enr::Enr;
use futures::StreamExt; use futures::StreamExt;
@ -40,7 +41,7 @@ pub struct Discovery {
/// All nodes discovered via discovery protocol. /// All nodes discovered via discovery protocol.
/// ///
/// These nodes can be ephemeral and are updated via the discovery protocol. /// These nodes can be ephemeral and are updated via the discovery protocol.
discovered_nodes: LruMap<PeerId, SocketAddr>, discovered_nodes: LruMap<PeerId, PeerAddr>,
/// Local ENR of the discovery v4 service (discv5 ENR has same [`PeerId`]). /// Local ENR of the discovery v4 service (discv5 ENR has same [`PeerId`]).
local_enr: NodeRecord, local_enr: NodeRecord,
/// Handler to interact with the Discovery v4 service /// Handler to interact with the Discovery v4 service
@ -204,12 +205,14 @@ impl Discovery {
/// Processes an incoming [`NodeRecord`] update from a discovery service /// Processes an incoming [`NodeRecord`] update from a discovery service
fn on_node_record_update(&mut self, record: NodeRecord, fork_id: Option<ForkId>) { fn on_node_record_update(&mut self, record: NodeRecord, fork_id: Option<ForkId>) {
let id = record.id; let peer_id = record.id;
let addr = record.tcp_addr(); let tcp_addr = record.tcp_addr();
let udp_addr = record.udp_addr();
let addr = PeerAddr::new(tcp_addr, Some(udp_addr));
_ = _ =
self.discovered_nodes.get_or_insert(id, || { self.discovered_nodes.get_or_insert(peer_id, || {
self.queued_events.push_back(DiscoveryEvent::NewNode( self.queued_events.push_back(DiscoveryEvent::NewNode(
DiscoveredEvent::EventQueued { peer_id: id, socket_addr: addr, fork_id }, DiscoveredEvent::EventQueued { peer_id, addr, fork_id },
)); ));
addr addr
@ -224,8 +227,8 @@ impl Discovery {
DiscoveryUpdate::EnrForkId(node, fork_id) => { DiscoveryUpdate::EnrForkId(node, fork_id) => {
self.queued_events.push_back(DiscoveryEvent::EnrForkId(node.id, fork_id)) self.queued_events.push_back(DiscoveryEvent::EnrForkId(node.id, fork_id))
} }
DiscoveryUpdate::Removed(node) => { DiscoveryUpdate::Removed(peer_id) => {
self.discovered_nodes.remove(&node); self.discovered_nodes.remove(&peer_id);
} }
DiscoveryUpdate::Batch(updates) => { DiscoveryUpdate::Batch(updates) => {
for update in updates { for update in updates {
@ -427,7 +430,7 @@ mod tests {
assert_eq!( assert_eq!(
DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
peer_id: discv4_id_2, peer_id: discv4_id_2,
socket_addr: discv4_enr_2.tcp_addr(), addr: PeerAddr::new(discv4_enr_2.tcp_addr(), Some(discv4_enr_2.udp_addr())),
fork_id: None fork_id: None
}), }),
event_node_1 event_node_1
@ -435,7 +438,7 @@ mod tests {
assert_eq!( assert_eq!(
DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
peer_id: discv4_id_1, peer_id: discv4_id_1,
socket_addr: discv4_enr_1.tcp_addr(), addr: PeerAddr::new(discv4_enr_1.tcp_addr(), Some(discv4_enr_1.udp_addr())),
fork_id: None fork_id: None
}), }),
event_node_2 event_node_2

View File

@ -26,7 +26,7 @@ use crate::{
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender}, message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender},
metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
network::{NetworkHandle, NetworkHandleMessage}, network::{NetworkHandle, NetworkHandleMessage},
peers::{PeersHandle, PeersManager}, peers::{PeerAddr, PeersHandle, PeersManager},
poll_nested_stream_with_budget, poll_nested_stream_with_budget,
protocol::IntoRlpxSubProtocol, protocol::IntoRlpxSubProtocol,
session::SessionManager, session::SessionManager,
@ -1030,7 +1030,7 @@ pub enum NetworkEvent {
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveredEvent { pub enum DiscoveredEvent {
EventQueued { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option<ForkId> }, EventQueued { peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId> },
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]

View File

@ -1,7 +1,13 @@
use crate::{ use crate::{
config::NetworkMode, discovery::DiscoveryEvent, manager::NetworkEvent, message::PeerRequest, config::NetworkMode,
peers::PeersHandle, protocol::RlpxSubProtocol, swarm::NetworkConnectionState, discovery::DiscoveryEvent,
transactions::TransactionsHandle, FetchClient, manager::NetworkEvent,
message::PeerRequest,
peers::{PeerAddr, PeersHandle},
protocol::RlpxSubProtocol,
swarm::NetworkConnectionState,
transactions::TransactionsHandle,
FetchClient,
}; };
use enr::Enr; use enr::Enr;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -257,7 +263,14 @@ impl Peers for NetworkHandle {
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a peer to the known /// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a peer to the known
/// set, with the given kind. /// set, with the given kind.
fn add_peer_kind(&self, peer: PeerId, kind: PeerKind, addr: SocketAddr) { fn add_peer_kind(
&self,
peer: PeerId,
kind: PeerKind,
tcp_addr: SocketAddr,
udp_addr: Option<SocketAddr>,
) {
let addr = PeerAddr::new(tcp_addr, udp_addr);
self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr)); self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr));
} }
@ -420,7 +433,7 @@ pub(crate) enum NetworkHandleMessage {
/// Marks a peer as trusted. /// Marks a peer as trusted.
AddTrustedPeerId(PeerId), AddTrustedPeerId(PeerId),
/// Adds an address for a peer, including its ID, kind, and socket address. /// Adds an address for a peer, including its ID, kind, and socket address.
AddPeerAddress(PeerId, PeerKind, SocketAddr), AddPeerAddress(PeerId, PeerKind, PeerAddr),
/// Removes a peer from the peerset corresponding to the given kind. /// Removes a peer from the peerset corresponding to the given kind.
RemovePeer(PeerId, PeerKind), RemovePeer(PeerId, PeerKind),
/// Disconnects a connection to a peer if it exists, optionally providing a disconnect reason. /// Disconnects a connection to a peer if it exists, optionally providing a disconnect reason.

View File

@ -155,13 +155,17 @@ impl PeersManager {
let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len()); let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len()); let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
for NodeRecord { address, tcp_port, udp_port: _, id } in trusted_nodes { for NodeRecord { address, tcp_port, udp_port, id } in trusted_nodes {
trusted_peer_ids.insert(id); trusted_peer_ids.insert(id);
peers.entry(id).or_insert_with(|| Peer::trusted(SocketAddr::from((address, tcp_port)))); peers.entry(id).or_insert_with(|| {
Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
});
} }
for NodeRecord { address, tcp_port, udp_port: _, id } in basic_nodes { for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
peers.entry(id).or_insert_with(|| Peer::new(SocketAddr::from((address, tcp_port)))); peers.entry(id).or_insert_with(|| {
Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
});
} }
Self { Self {
@ -198,7 +202,27 @@ impl PeersManager {
/// Returns an iterator over all peers /// Returns an iterator over all peers
pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ { pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
self.peers.iter().map(|(peer_id, v)| NodeRecord::new(v.addr, *peer_id)) self.peers.iter().map(|(peer_id, v)| {
NodeRecord::new_with_ports(
v.addr.tcp.ip(),
v.addr.tcp.port(),
v.addr.udp.map(|addr| addr.port()),
*peer_id,
)
})
}
/// Returns the [`NodeRecord`] for the given peer id
#[allow(dead_code)]
fn peer_by_id(&self, peer_id: PeerId) -> Option<NodeRecord> {
self.peers.get(&peer_id).map(|v| {
NodeRecord::new_with_ports(
v.addr.tcp.ip(),
v.addr.tcp.port(),
v.addr.udp.map(|addr| addr.port()),
peer_id,
)
})
} }
/// Returns an iterator over all peer ids for peers with the given kind /// Returns an iterator over all peer ids for peers with the given kind
@ -329,7 +353,7 @@ impl PeersManager {
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
// peer is missing in the table, we add it but mark it as to be removed after // peer is missing in the table, we add it but mark it as to be removed after
// disconnect, because we only know the outgoing port // disconnect, because we only know the outgoing port
let mut peer = Peer::with_state(addr, PeerConnectionState::In); let mut peer = Peer::with_state(PeerAddr::tcp(addr), PeerConnectionState::In);
peer.remove_after_disconnect = true; peer.remove_after_disconnect = true;
entry.insert(peer); entry.insert(peer);
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id)); self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
@ -654,7 +678,7 @@ impl PeersManager {
/// Called for a newly discovered peer. /// Called for a newly discovered peer.
/// ///
/// If the peer already exists, then the address, kind and `fork_id` will be updated. /// If the peer already exists, then the address, kind and `fork_id` will be updated.
pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: SocketAddr, fork_id: Option<ForkId>) { pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id) self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id)
} }
@ -667,7 +691,7 @@ impl PeersManager {
/// ///
/// If the peer already exists, then the address and kind will be updated. /// If the peer already exists, then the address and kind will be updated.
#[allow(dead_code)] #[allow(dead_code)]
pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: SocketAddr) { pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None) self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None)
} }
@ -678,10 +702,10 @@ impl PeersManager {
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
kind: PeerKind, kind: PeerKind,
addr: SocketAddr, addr: PeerAddr,
fork_id: Option<ForkId>, fork_id: Option<ForkId>,
) { ) {
if self.ban_list.is_banned(&peer_id, &addr.ip()) { if self.ban_list.is_banned(&peer_id, &addr.tcp.ip()) {
return return
} }
@ -700,7 +724,7 @@ impl PeersManager {
} }
} }
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
trace!(target: "net::peers", ?peer_id, ?addr, "discovered new node"); trace!(target: "net::peers", ?peer_id, ?addr.tcp, "discovered new node");
let mut peer = Peer::with_kind(addr, kind); let mut peer = Peer::with_kind(addr, kind);
peer.fork_id = fork_id; peer.fork_id = fork_id;
entry.insert(peer); entry.insert(peer);
@ -804,7 +828,7 @@ impl PeersManager {
return return
} }
// as long as there a slots available fill them with the best peers // as long as there are slots available fill them with the best peers
while self.connection_info.has_out_capacity() { while self.connection_info.has_out_capacity() {
let action = { let action = {
let (peer_id, peer) = match self.best_unconnected() { let (peer_id, peer) = match self.best_unconnected() {
@ -815,7 +839,7 @@ impl PeersManager {
trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection"); trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
peer.state = PeerConnectionState::PendingOut; peer.state = PeerConnectionState::PendingOut;
PeerAction::Connect { peer_id, remote_addr: peer.addr } PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp }
}; };
self.connection_info.inc_pending_out(); self.connection_info.inc_pending_out();
@ -853,7 +877,7 @@ impl PeersManager {
while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) { while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
match cmd { match cmd {
PeerCommand::Add(peer_id, addr) => { PeerCommand::Add(peer_id, addr) => {
self.add_peer(peer_id, addr, None); self.add_peer(peer_id, PeerAddr::tcp(addr), None);
} }
PeerCommand::Remove(peer) => self.remove_peer(peer), PeerCommand::Remove(peer) => self.remove_peer(peer),
PeerCommand::ReputationChange(peer_id, rep) => { PeerCommand::ReputationChange(peer_id, rep) => {
@ -985,11 +1009,43 @@ impl ConnectionInfo {
} }
} }
/// Represents a peer's address information.
///
/// # Fields
///
/// - `tcp`: A `SocketAddr` representing the peer's data transfer address.
/// - `udp`: An optional `SocketAddr` representing the peer's discover address. `None` if the peer
/// is directly connecting to us or the port is the same to `tcp`'s
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct PeerAddr {
tcp: SocketAddr,
udp: Option<SocketAddr>,
}
impl PeerAddr {
/// Returns a new `PeerAddr` with the given `tcp` and `udp` addresses.
pub const fn new(tcp: SocketAddr, udp: Option<SocketAddr>) -> Self {
Self { tcp, udp }
}
/// Returns a new `PeerAddr` with a `tcp` address only.
pub const fn tcp(tcp: SocketAddr) -> Self {
Self { tcp, udp: None }
}
/// Returns a new `PeerAddr` with the given `tcp` and `udp` ports.
fn new_with_ports(ip: IpAddr, tcp_port: u16, udp_port: Option<u16>) -> Self {
let tcp = SocketAddr::new(ip, tcp_port);
let udp = udp_port.map(|port| SocketAddr::new(ip, port));
Self::new(tcp, udp)
}
}
/// Tracks info about a single peer. /// Tracks info about a single peer.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Peer { pub struct Peer {
/// Where to reach the peer /// Where to reach the peer.
addr: SocketAddr, addr: PeerAddr,
/// Reputation of the peer. /// Reputation of the peer.
reputation: i32, reputation: i32,
/// The state of the connection, if any. /// The state of the connection, if any.
@ -1010,11 +1066,11 @@ pub struct Peer {
// === impl Peer === // === impl Peer ===
impl Peer { impl Peer {
fn new(addr: SocketAddr) -> Self { fn new(addr: PeerAddr) -> Self {
Self::with_state(addr, Default::default()) Self::with_state(addr, Default::default())
} }
fn trusted(addr: SocketAddr) -> Self { fn trusted(addr: PeerAddr) -> Self {
Self { kind: PeerKind::Trusted, ..Self::new(addr) } Self { kind: PeerKind::Trusted, ..Self::new(addr) }
} }
@ -1023,7 +1079,7 @@ impl Peer {
self.reputation self.reputation
} }
fn with_state(addr: SocketAddr, state: PeerConnectionState) -> Self { fn with_state(addr: PeerAddr, state: PeerConnectionState) -> Self {
Self { Self {
addr, addr,
state, state,
@ -1036,7 +1092,7 @@ impl Peer {
} }
} }
fn with_kind(addr: SocketAddr, kind: PeerKind) -> Self { fn with_kind(addr: PeerAddr, kind: PeerKind) -> Self {
Self { kind, ..Self::new(addr) } Self { kind, ..Self::new(addr) }
} }
@ -1257,7 +1313,7 @@ mod tests {
use super::PeersManager; use super::PeersManager;
use crate::{ use crate::{
peers::{ peers::{
ConnectionInfo, InboundConnectionError, PeerAction, PeerBackoffDurations, ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
PeerConnectionState, PeerConnectionState,
}, },
session::PendingSessionHandshakeError, session::PendingSessionHandshakeError,
@ -1306,7 +1362,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(peer_id) => { PeerAction::PeerAdded(peer_id) => {
@ -1321,6 +1377,37 @@ mod tests {
} }
_ => unreachable!(), _ => unreachable!(),
} }
let record = peers.peer_by_id(peer).unwrap();
assert_eq!(record.tcp_addr(), socket_addr);
assert_eq!(record.udp_addr(), socket_addr);
}
#[tokio::test]
async fn test_insert_udp() {
let peer = PeerId::random();
let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, tcp_addr);
}
_ => unreachable!(),
}
let record = peers.peer_by_id(peer).unwrap();
assert_eq!(record.tcp_addr(), tcp_addr);
assert_eq!(record.udp_addr(), udp_addr);
} }
#[tokio::test] #[tokio::test]
@ -1329,7 +1416,7 @@ mod tests {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.ban_peer(peer); peers.ban_peer(peer);
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
match event!(peers) { match event!(peers) {
PeerAction::BanPeer { peer_id } => { PeerAction::BanPeer { peer_id } => {
@ -1351,7 +1438,7 @@ mod tests {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.ban_peer(peer); peers.ban_peer(peer);
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
match event!(peers) { match event!(peers) {
PeerAction::BanPeer { peer_id } => { PeerAction::BanPeer { peer_id } => {
@ -1388,7 +1475,7 @@ mod tests {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::new(PeersConfig::test()); let mut peers = PeersManager::new(PeersConfig::test());
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(peer_id) => { PeerAction::PeerAdded(peer_id) => {
@ -1447,7 +1534,7 @@ mod tests {
let backoff_durations = PeerBackoffDurations::test(); let backoff_durations = PeerBackoffDurations::test();
let config = PeersConfig { backoff_durations, ..PeersConfig::test() }; let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
let mut peers = PeersManager::new(config); let mut peers = PeersManager::new(config);
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(peer_id) => { PeerAction::PeerAdded(peer_id) => {
@ -1504,7 +1591,7 @@ mod tests {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::test(); let config = PeersConfig::test();
let mut peers = PeersManager::new(config); let mut peers = PeersManager::new(config);
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
let peer_struct = peers.peers.get_mut(&peer).unwrap(); let peer_struct = peers.peers.get_mut(&peer).unwrap();
let backoff_timestamp = peers let backoff_timestamp = peers
@ -1521,7 +1608,7 @@ mod tests {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::default(); let config = PeersConfig::default();
let mut peers = PeersManager::new(config); let mut peers = PeersManager::new(config);
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
let peer_struct = peers.peers.get_mut(&peer).unwrap(); let peer_struct = peers.peers.get_mut(&peer).unwrap();
// Simulate a peer that was already backed off once // Simulate a peer that was already backed off once
@ -1549,7 +1636,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(peer_id) => { PeerAction::PeerAdded(peer_id) => {
@ -1606,7 +1693,7 @@ mod tests {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::test(); let config = PeersConfig::test();
let mut peers = PeersManager::new(config.clone()); let mut peers = PeersManager::new(config.clone());
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
let peer_struct = peers.peers.get_mut(&peer).unwrap(); let peer_struct = peers.peers.get_mut(&peer).unwrap();
// Simulate a peer that was already backed off once // Simulate a peer that was already backed off once
@ -1660,7 +1747,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(peer_id) => { PeerAction::PeerAdded(peer_id) => {
@ -1772,7 +1859,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(peer_id) => { PeerAction::PeerAdded(peer_id) => {
@ -1890,7 +1977,7 @@ mod tests {
// to increase by 1 // to increase by 1
peers.on_incoming_session_established(peer, socket_addr); peers.on_incoming_session_established(peer, socket_addr);
let p = peers.peers.get_mut(&peer).expect("peer not found"); let p = peers.peers.get_mut(&peer).expect("peer not found");
assert_eq!(p.addr, socket_addr); assert_eq!(p.addr.tcp, socket_addr);
assert_eq!(peers.connection_info.num_pending_in, 0); assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_inbound, 1); assert_eq!(peers.connection_info.num_inbound, 1);
@ -1905,7 +1992,7 @@ mod tests {
peers.on_already_connected(Direction::Incoming); peers.on_already_connected(Direction::Incoming);
let p = peers.peers.get_mut(&peer).expect("peer not found"); let p = peers.peers.get_mut(&peer).expect("peer not found");
assert_eq!(p.addr, socket_addr); assert_eq!(p.addr.tcp, socket_addr);
assert_eq!(peers.connection_info.num_pending_in, 0); assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_inbound, 1); assert_eq!(peers.connection_info.num_inbound, 1);
} }
@ -1915,7 +2002,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.add_trusted_peer(peer, socket_addr); peers.add_trusted_peer(peer, PeerAddr::tcp(socket_addr));
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(peer_id) => { PeerAction::PeerAdded(peer_id) => {
@ -1967,7 +2054,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
assert_eq!(peers.get_reputation(&peer), Some(0)); assert_eq!(peers.get_reputation(&peer), Some(0));
peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024)); peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
@ -1982,7 +2069,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(peer_id) => { PeerAction::PeerAdded(peer_id) => {
@ -2019,7 +2106,7 @@ mod tests {
let p = peers.peers.get(&peer).unwrap(); let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut); assert_eq!(p.state, PeerConnectionState::PendingOut);
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
let p = peers.peers.get(&peer).unwrap(); let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut); assert_eq!(p.state, PeerConnectionState::PendingOut);
@ -2032,7 +2119,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(peer_id) => { PeerAction::PeerAdded(peer_id) => {
@ -2067,7 +2154,7 @@ mod tests {
let peer = PeerId::random(); let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.add_peer(peer, socket_addr, None); peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(peer_id) => { PeerAction::PeerAdded(peer_id) => {
@ -2101,7 +2188,7 @@ mod tests {
let ban_list = BanList::new(HashSet::new(), vec![ip]); let ban_list = BanList::new(HashSet::new(), vec![ip]);
let config = PeersConfig::default().with_ban_list(ban_list); let config = PeersConfig::default().with_ban_list(ban_list);
let mut peer_manager = PeersManager::new(config); let mut peer_manager = PeersManager::new(config);
peer_manager.add_peer(B512::default(), socket_addr, None); peer_manager.add_peer(B512::default(), PeerAddr::tcp(socket_addr), None);
assert!(peer_manager.peers.is_empty()); assert!(peer_manager.peers.is_empty());
} }
@ -2204,7 +2291,7 @@ mod tests {
let basic_peer = PeerId::random(); let basic_peer = PeerId::random();
let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009); let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
peers.add_peer(basic_peer, basic_sock, None); peers.add_peer(basic_peer, PeerAddr::tcp(basic_sock), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(peer_id) => { PeerAction::PeerAdded(peer_id) => {
@ -2244,7 +2331,7 @@ mod tests {
let basic_peer = PeerId::random(); let basic_peer = PeerId::random();
let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009); let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
peers.add_peer(basic_peer, basic_sock, None); peers.add_peer(basic_peer, PeerAddr::tcp(basic_sock), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(peer_id) => { PeerAction::PeerAdded(peer_id) => {
@ -2352,7 +2439,7 @@ mod tests {
let config = PeersConfig::test(); let config = PeersConfig::test();
let mut peer_manager = PeersManager::new(config); let mut peer_manager = PeersManager::new(config);
let peer_id = PeerId::random(); let peer_id = PeerId::random();
peer_manager.add_peer(peer_id, socket_addr, None); peer_manager.add_peer(peer_id, PeerAddr::tcp(socket_addr), None);
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
peer_manager.tick(); peer_manager.tick();
@ -2407,7 +2494,7 @@ mod tests {
assert!(peer.remove_after_disconnect); assert!(peer.remove_after_disconnect);
// trigger discovery manually while the peer is still connected // trigger discovery manually while the peer is still connected
peers.add_peer(peer_id, addr, None); peers.add_peer(peer_id, PeerAddr::tcp(addr), None);
peers.on_active_session_gracefully_closed(peer_id); peers.on_active_session_gracefully_closed(peer_id);
@ -2423,7 +2510,7 @@ mod tests {
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.on_incoming_pending_session(addr.ip()).unwrap(); peers.on_incoming_pending_session(addr.ip()).unwrap();
peers.add_peer(peer_id, addr, None); peers.add_peer(peer_id, PeerAddr::tcp(addr), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(_) => {} PeerAction::PeerAdded(_) => {}
@ -2451,7 +2538,7 @@ mod tests {
let mut peers = PeersManager::default(); let mut peers = PeersManager::default();
peers.on_incoming_pending_session(addr.ip()).unwrap(); peers.on_incoming_pending_session(addr.ip()).unwrap();
peers.add_peer(peer_id, addr, None); peers.add_peer(peer_id, PeerAddr::tcp(addr), None);
match event!(peers) { match event!(peers) {
PeerAction::PeerAdded(_) => {} PeerAction::PeerAdded(_) => {}
@ -2482,9 +2569,9 @@ mod tests {
let config = PeersConfig::default(); let config = PeersConfig::default();
let mut peer_manager = PeersManager::new(config); let mut peer_manager = PeersManager::new(config);
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)); let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let socket_addr = SocketAddr::new(ip, 8008); let peer_addr = PeerAddr::tcp(SocketAddr::new(ip, 8008));
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 { for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
peer_manager.add_peer(PeerId::random(), socket_addr, None); peer_manager.add_peer(PeerId::random(), peer_addr, None);
} }
peer_manager.fill_outbound_slots(); peer_manager.fill_outbound_slots();
@ -2501,11 +2588,11 @@ mod tests {
let config = PeersConfig::default(); let config = PeersConfig::default();
let mut peer_manager = PeersManager::new(config); let mut peer_manager = PeersManager::new(config);
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)); let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let socket_addr = SocketAddr::new(ip, 8008); let peer_addr = PeerAddr::tcp(SocketAddr::new(ip, 8008));
// add more peers than allowed // add more peers than allowed
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 { for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
peer_manager.add_peer(PeerId::random(), socket_addr, None); peer_manager.add_peer(PeerId::random(), peer_addr, None);
} }
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 { for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {

View File

@ -9,7 +9,7 @@ use crate::{
BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse, BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse,
PeerResponseResult, PeerResponseResult,
}, },
peers::{PeerAction, PeersManager}, peers::{PeerAction, PeerAddr, PeersManager},
FetchClient, FetchClient,
}; };
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
@ -274,7 +274,7 @@ where
} }
/// Adds a peer and its address with the given kind to the peerset. /// Adds a peer and its address with the given kind to the peerset.
pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: SocketAddr) { pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
self.peers_manager.add_peer_kind(peer_id, kind, addr, None) self.peers_manager.add_peer_kind(peer_id, kind, addr, None)
} }
@ -288,14 +288,10 @@ where
/// Event hook for events received from the discovery service. /// Event hook for events received from the discovery service.
fn on_discovery_event(&mut self, event: DiscoveryEvent) { fn on_discovery_event(&mut self, event: DiscoveryEvent) {
match event { match event {
DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, addr, fork_id }) => {
peer_id,
socket_addr,
fork_id,
}) => {
self.queued_messages.push_back(StateAction::DiscoveredNode { self.queued_messages.push_back(StateAction::DiscoveredNode {
peer_id, peer_id,
socket_addr, addr,
fork_id, fork_id,
}); });
} }
@ -516,7 +512,7 @@ pub(crate) enum StateAction {
fork_id: ForkId, fork_id: ForkId,
}, },
/// A new node was found through the discovery, possibly with a `ForkId` /// A new node was found through the discovery, possibly with a `ForkId`
DiscoveredNode { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option<ForkId> }, DiscoveredNode { peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId> },
/// A peer was added /// A peer was added
PeerAdded(PeerId), PeerAdded(PeerId),
/// A peer was dropped /// A peer was dropped

View File

@ -247,14 +247,14 @@ where
} }
StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)), StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)),
StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)), StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)),
StateAction::DiscoveredNode { peer_id, socket_addr, fork_id } => { StateAction::DiscoveredNode { peer_id, addr, fork_id } => {
// Don't try to connect to peer if node is shutting down // Don't try to connect to peer if node is shutting down
if self.is_shutting_down() { if self.is_shutting_down() {
return None return None
} }
// Insert peer only if no fork id or a valid fork id // Insert peer only if no fork id or a valid fork id
if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) { if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) {
self.state_mut().peers_mut().add_peer(peer_id, socket_addr, fork_id); self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id);
} }
} }
StateAction::DiscoveredEnrForkId { peer_id, fork_id } => { StateAction::DiscoveredEnrForkId { peer_id, fork_id } => {

View File

@ -92,6 +92,17 @@ impl NodeRecord {
Self { address: addr.ip(), tcp_port: addr.port(), udp_port: addr.port(), id } Self { address: addr.ip(), tcp_port: addr.port(), udp_port: addr.port(), id }
} }
/// Creates a new record from an ip address and ports.
pub fn new_with_ports(
ip_addr: IpAddr,
tcp_port: u16,
udp_port: Option<u16>,
id: PeerId,
) -> Self {
let udp_port = udp_port.unwrap_or(tcp_port);
Self { address: ip_addr, tcp_port, udp_port, id }
}
/// The TCP socket address of this node /// The TCP socket address of this node
#[must_use] #[must_use]
pub const fn tcp_addr(&self) -> SocketAddr { pub const fn tcp_addr(&self) -> SocketAddr {

View File

@ -38,7 +38,7 @@ where
{ {
/// Handler for `admin_addPeer` /// Handler for `admin_addPeer`
fn add_peer(&self, record: NodeRecord) -> RpcResult<bool> { fn add_peer(&self, record: NodeRecord) -> RpcResult<bool> {
self.network.add_peer(record.id, record.tcp_addr()); self.network.add_peer_with_udp(record.id, record.tcp_addr(), record.udp_addr());
Ok(true) Ok(true)
} }
@ -51,7 +51,7 @@ where
/// Handler for `admin_addTrustedPeer` /// Handler for `admin_addTrustedPeer`
fn add_trusted_peer(&self, record: AnyNode) -> RpcResult<bool> { fn add_trusted_peer(&self, record: AnyNode) -> RpcResult<bool> {
if let Some(record) = record.node_record() { if let Some(record) = record.node_record() {
self.network.add_trusted_peer(record.id, record.tcp_addr()) self.network.add_trusted_peer_with_udp(record.id, record.tcp_addr(), record.udp_addr())
} }
self.network.add_trusted_peer_id(record.peer_id()); self.network.add_trusted_peer_id(record.peer_id());
Ok(true) Ok(true)