feat(net): Make PeerManager emit peerevents for admin rpc (#602)

* Make PeerManager emit peerevents for admin rpc

* Fix tests to include new peer events
This commit is contained in:
Sanket Shanbhag
2022-12-25 14:30:10 +05:30
committed by GitHub
parent 19a618e3a4
commit 73e12341c4
5 changed files with 98 additions and 4 deletions

View File

@ -560,6 +560,14 @@ where
messages,
});
}
SwarmEvent::PeerAdded(peer_id) => {
info!(target: "net", ?peer_id, "Peer added");
this.event_listeners.send(NetworkEvent::PeerAdded(peer_id));
}
SwarmEvent::PeerRemoved(peer_id) => {
info!(target: "net", ?peer_id, "Peer dropped");
this.event_listeners.send(NetworkEvent::PeerRemoved(peer_id));
}
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
let total_active = this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
trace!(

View File

@ -72,7 +72,7 @@ impl PeersHandle {
pub(crate) struct PeersManager {
/// All peers known to the network
peers: HashMap<PeerId, Peer>,
/// Copy of the receiver half, so new [`PeersHandle`] can be created on demand.
/// Copy of the sender half, so new [`PeersHandle`] can be created on demand.
manager_tx: mpsc::UnboundedSender<PeerCommand>,
/// Receiver half of the command channel.
handle_rx: UnboundedReceiverStream<PeerCommand>,
@ -209,6 +209,7 @@ impl PeersManager {
}
Entry::Vacant(entry) => {
entry.insert(Peer::with_state(addr, PeerConnectionState::In));
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
}
}
}
@ -290,6 +291,7 @@ impl PeersManager {
if entry.get().remove_after_disconnect {
// this peer should be removed from the set
entry.remove();
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
} else {
entry.get_mut().state = PeerConnectionState::Idle;
return
@ -329,6 +331,7 @@ impl PeersManager {
// issues.
if let Some(peer) = self.peers.remove(peer_id) {
self.connection_info.decr_state(peer.state);
self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
}
// ban the peer
@ -404,6 +407,7 @@ impl PeersManager {
Entry::Vacant(entry) => {
trace!(target : "net::peers", ?peer_id, ?addr, "discovered new node");
entry.insert(Peer::new(addr));
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
}
}
@ -414,6 +418,7 @@ impl PeersManager {
pub(crate) fn remove_discovered_node(&mut self, peer_id: PeerId) {
if let Some(mut peer) = self.peers.remove(&peer_id) {
trace!(target : "net::peers", ?peer_id, "remove discovered node");
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
if peer.state.is_connected() {
debug!(target : "net::peers", ?peer_id, "disconnecting on remove from discovery");
@ -767,6 +772,10 @@ pub enum PeerAction {
BanPeer { peer_id: PeerId },
/// Unban the peer temporarily
UnBanPeer { peer_id: PeerId },
/// Emit peerAdded event
PeerAdded(PeerId),
/// Emit peerRemoved event
PeerRemoved(PeerId),
}
/// Config type for initiating a [`PeersManager`] instance
@ -901,6 +910,12 @@ mod test {
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
@ -942,6 +957,12 @@ mod test {
let mut peers = PeersManager::new(config);
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
@ -996,6 +1017,12 @@ mod test {
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
@ -1017,6 +1044,12 @@ mod test {
)),
);
match event!(peers) {
PeerAction::PeerRemoved(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
assert_eq!(peer_id, peer);
@ -1040,6 +1073,12 @@ mod test {
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
@ -1061,6 +1100,12 @@ mod test {
)),
);
match event!(peers) {
PeerAction::PeerRemoved(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
assert_eq!(peer_id, peer);
@ -1140,6 +1185,12 @@ mod test {
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
@ -1178,6 +1229,12 @@ mod test {
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
@ -1191,6 +1248,12 @@ mod test {
peers.remove_discovered_node(peer);
match event!(peers) {
PeerAction::PeerRemoved(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Disconnect { peer_id, .. } => {
assert_eq!(peer_id, peer);

View File

@ -287,6 +287,12 @@ where
self.ban_discovery(peer_id, ip_addr)
}
PeerAction::DiscoveryBanIp { ip_addr } => self.ban_ip_discovery(ip_addr),
PeerAction::PeerAdded(peer_id) => {
self.queued_messages.push_back(StateAction::PeerAdded(peer_id))
}
PeerAction::PeerRemoved(peer_id) => {
self.queued_messages.push_back(StateAction::PeerRemoved(peer_id))
}
PeerAction::BanPeer { .. } => {}
PeerAction::UnBanPeer { .. } => {}
}
@ -464,4 +470,8 @@ pub(crate) enum StateAction {
/// The reported [`ForkId`] by this peer.
fork_id: ForkId,
},
/// A peer was added
PeerAdded(PeerId),
/// A peer was dropped
PeerRemoved(PeerId),
}

View File

@ -231,6 +231,8 @@ where
let msg = PeerMessage::NewBlockHashes(hashes);
self.sessions.send_message(&peer_id, msg);
}
StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)),
StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)),
StateAction::DiscoveredEnrForkId { peer_id, fork_id } => {
if self.sessions.is_valid_fork_id(fork_id) {
self.state_mut().peers_mut().set_discovered_fork_id(peer_id, fork_id);
@ -352,6 +354,10 @@ pub(crate) enum SwarmEvent {
/// Whether the session was closed due to an error
error: Option<EthStreamError>,
},
/// Admin rpc: new peer added
PeerAdded(PeerId),
/// Admin rpc: peer removed
PeerRemoved(PeerId),
/// Closed an incoming pending session during authentication.
IncomingPendingSessionClosed {
remote_addr: SocketAddr,

View File

@ -54,9 +54,10 @@ async fn test_establish_connections() {
handle0.add_peer(*handle2.peer_id(), handle2.local_addr());
let mut expected_connections = HashSet::from([*handle1.peer_id(), *handle2.peer_id()]);
let mut expected_peers = expected_connections.clone();
// wait for all initiator connections
let mut established = listener0.take(2);
let mut established = listener0.take(4);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionClosed { .. } => {
@ -66,7 +67,7 @@ async fn test_establish_connections() {
assert!(expected_connections.remove(&peer_id))
}
NetworkEvent::PeerAdded(peer_id) => {
assert!(!expected_connections.contains(&peer_id))
assert!(expected_peers.remove(&peer_id))
}
NetworkEvent::PeerRemoved(_) => {
panic!("unexpected event")
@ -74,6 +75,7 @@ async fn test_establish_connections() {
}
}
assert!(expected_connections.is_empty());
assert!(expected_peers.is_empty());
// also await the established session on both target
futures::future::join(listener1.next(), listener2.next()).await;
@ -365,9 +367,14 @@ async fn test_geth_disconnect() {
let geth_peer_id: PeerId =
provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into();
// add geth as a peer then wait for a `SessionEstablished` event
// add geth as a peer then wait for `PeerAdded` and `SessionEstablished` events.
handle.add_peer(geth_peer_id, geth_socket);
match events.next().await {
Some(NetworkEvent::PeerAdded(peer_id)) => assert_eq!(peer_id, geth_peer_id),
_ => panic!("Expected a peer added event"),
}
if let Some(NetworkEvent::SessionEstablished { peer_id, .. }) = events.next().await {
assert_eq!(peer_id, geth_peer_id);
} else {