feat : add the ability to connect to a peer (#10028)

Co-authored-by: Emilia Hane <emiliaha95@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
nk_ysg
2024-08-24 15:16:34 +08:00
committed by GitHub
parent a7a9f9f3b3
commit 22f928a2b1
7 changed files with 150 additions and 0 deletions

View File

@ -184,6 +184,21 @@ pub trait Peers: PeersInfo {
/// Disconnect an existing connection to the given peer using the provided reason
fn disconnect_peer_with_reason(&self, peer: PeerId, reason: DisconnectReason);
/// Connect to the given peer. NOTE: if the maximum number out outbound sessions is reached,
/// this won't do anything. See `reth_network::SessionManager::dial_outbound`.
fn connect_peer(&self, peer: PeerId, tcp_addr: SocketAddr) {
self.connect_peer_kind(peer, PeerKind::Static, tcp_addr, None)
}
/// Connects a peer to the known peer set, with the given kind.
fn connect_peer_kind(
&self,
peer: PeerId,
kind: PeerKind,
tcp_addr: SocketAddr,
udp_addr: Option<SocketAddr>,
);
/// Send a reputation change for the given peer.
fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind);

View File

@ -102,6 +102,15 @@ impl Peers for NoopNetwork {
fn disconnect_peer_with_reason(&self, _peer: PeerId, _reason: DisconnectReason) {}
fn connect_peer_kind(
&self,
_peer: PeerId,
_kind: PeerKind,
_tcp_addr: SocketAddr,
_udp_addr: Option<SocketAddr>,
) {
}
fn reputation_change(&self, _peer_id: PeerId, _kind: ReputationChangeKind) {}
async fn reputation_by_id(&self, _peer_id: PeerId) -> Result<Option<Reputation>, NetworkError> {

View File

@ -588,6 +588,9 @@ impl NetworkManager {
NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
self.swarm.sessions_mut().disconnect(peer_id, reason);
}
NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
}
NetworkHandleMessage::SetNetworkState(net_state) => {
// Sets network connection state between Active and Hibernate.
// If hibernate stops the node to fill new outbound

View File

@ -301,6 +301,22 @@ impl Peers for NetworkHandle {
self.send_message(NetworkHandleMessage::DisconnectPeer(peer, Some(reason)))
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to connect to the given
/// peer.
fn connect_peer_kind(
&self,
peer_id: PeerId,
kind: PeerKind,
tcp_addr: SocketAddr,
udp_addr: Option<SocketAddr>,
) {
self.send_message(NetworkHandleMessage::ConnectPeer(
peer_id,
kind,
PeerAddr::new(tcp_addr, udp_addr),
))
}
/// Send a reputation change for the given peer.
fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) {
self.send_message(NetworkHandleMessage::ReputationChange(peer_id, kind));
@ -481,4 +497,6 @@ pub(crate) enum NetworkHandleMessage {
DiscoveryListener(UnboundedSender<DiscoveryEvent>),
/// Adds an additional `RlpxSubProtocol`.
AddRlpxSubProtocol(RlpxSubProtocol),
/// Connect to the given peer.
ConnectPeer(PeerId, PeerKind, PeerAddr),
}

View File

@ -729,6 +729,48 @@ impl PeersManager {
}
}
/// Connect to the given peer. NOTE: if the maximum number out outbound sessions is reached,
/// this won't do anything. See `reth_network::SessionManager::dial_outbound`.
#[allow(dead_code)]
pub(crate) fn add_and_connect(
&mut self,
peer_id: PeerId,
addr: PeerAddr,
fork_id: Option<ForkId>,
) {
self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
}
/// Connects a peer and its address with the given kind.
pub(crate) fn add_and_connect_kind(
&mut self,
peer_id: PeerId,
kind: PeerKind,
addr: PeerAddr,
fork_id: Option<ForkId>,
) {
if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
return
}
match self.peers.entry(peer_id) {
Entry::Vacant(entry) => {
trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
let mut peer = Peer::with_kind(addr, kind);
peer.state = PeerConnectionState::PendingOut;
peer.fork_id = fork_id;
entry.insert(peer);
self.queued_actions
.push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
}
_ => return,
}
if kind.is_trusted() {
self.trusted_peer_ids.insert(peer_id);
}
}
/// Removes the tracked node from the trusted set.
pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
@ -2381,4 +2423,32 @@ mod tests {
// no more pending outbound connections
assert_eq!(peer_manager.connection_info.num_pending_out, 0);
}
#[tokio::test]
async fn test_connect() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
let (record, _) = peers.peer_by_id(peer).unwrap();
assert_eq!(record.tcp_addr(), socket_addr);
assert_eq!(record.udp_addr(), socket_addr);
// connect again
peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
let (record, _) = peers.peer_by_id(peer).unwrap();
assert_eq!(record.tcp_addr(), socket_addr);
assert_eq!(record.udp_addr(), socket_addr);
}
}

View File

@ -296,6 +296,11 @@ impl NetworkState {
self.peers_manager.add_peer_kind(peer_id, kind, addr, None)
}
/// Connects a peer and its address with the given kind
pub(crate) fn add_and_connect(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
self.peers_manager.add_and_connect_kind(peer_id, kind, addr, None)
}
/// Removes a peer and its address with the given kind from the peerset.
pub(crate) fn remove_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind) {
match kind {

View File

@ -710,3 +710,33 @@ async fn test_connect_many() {
assert_eq!(peer.network().num_connected_peers(), 4);
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_disconnect_then_connect() {
reth_tracing::init_test_tracing();
let net = Testnet::create(2).await;
net.for_each(|peer| assert_eq!(0, peer.num_peers()));
let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
drop(handles);
let _handle = net.spawn();
let mut listener0 = NetworkEventStream::new(handle0.event_listener());
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let peer = listener0.next_session_established().await.unwrap();
assert_eq!(peer, *handle1.peer_id());
handle0.disconnect_peer(*handle1.peer_id());
let (peer, _) = listener0.next_session_closed().await.unwrap();
assert_eq!(peer, *handle1.peer_id());
handle0.connect_peer(*handle1.peer_id(), handle1.local_addr());
let peer = listener0.next_session_established().await.unwrap();
assert_eq!(peer, *handle1.peer_id());
}