fix: increment pending out on manually triggered connect (#14257)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
int88
2025-02-07 19:35:14 +08:00
committed by GitHub
parent b8a4468c69
commit 6dfcb7a85f
3 changed files with 72 additions and 1 deletions

View File

@ -789,6 +789,8 @@ impl PeersManager {
}
/// Connects a peer and its address with the given kind.
///
/// Note: This is invoked ond demand via an external command received by the manager
pub(crate) fn add_and_connect_kind(
&mut self,
peer_id: PeerId,
@ -807,6 +809,7 @@ impl PeersManager {
peer.state = PeerConnectionState::PendingOut;
peer.fork_id = fork_id;
entry.insert(peer);
self.connection_info.inc_pending_out();
self.queued_actions
.push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
}
@ -2786,4 +2789,14 @@ mod tests {
peers.on_active_session_gracefully_closed(peer);
assert_eq!(peers.connection_info.num_inbound, 0);
}
#[tokio::test]
async fn test_add_pending_onnect() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
assert_eq!(peers.connection_info.num_pending_out, 1);
}
}

View File

@ -763,4 +763,24 @@ impl NetworkEventStream {
_ => None,
}
}
/// Awaits the next event for a peer added.
pub async fn peer_added(&mut self) -> Option<PeerId> {
let peer_id = match self.inner.next().await {
Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
_ => return None,
};
Some(peer_id)
}
/// Awaits the next event for a peer remvoed.
pub async fn peer_removed(&mut self) -> Option<PeerId> {
let peer_id = match self.inner.next().await {
Some(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))) => peer_id,
_ => return None,
};
Some(peer_id)
}
}

View File

@ -4,7 +4,7 @@ use alloy_node_bindings::Geth;
use alloy_primitives::map::HashSet;
use alloy_provider::{ext::AdminApi, ProviderBuilder};
use futures::StreamExt;
use reth_chainspec::MAINNET;
use reth_chainspec::{MAINNET, SEPOLIA};
use reth_discv4::Discv4Config;
use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, HeadersDirection};
use reth_net_banlist::BanList;
@ -856,3 +856,41 @@ async fn test_disconnect_then_connect() {
let peer = listener0.next_session_established().await.unwrap();
assert_eq!(peer, *handle1.peer_id());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_connect_peer_in_different_network_should_fail() {
reth_tracing::init_test_tracing();
// peer in mainnet.
let peer = new_random_peer(10, vec![]).await;
let peer_handle = peer.handle().clone();
tokio::task::spawn(peer);
// peer in sepolia.
let secret_key = SecretKey::new(&mut rand::thread_rng());
// If the remote disconnect first, then we would not get a fatal protocol error. So set
// max_backoff_count to 0 to speed up the removal of the peer.
let peers_config = PeersConfig::default().with_max_backoff_count(0);
let config = NetworkConfigBuilder::eth(secret_key)
.listener_port(0)
.disable_discovery()
.peer_config(peers_config)
.build_with_noop_provider(SEPOLIA.clone());
let network = NetworkManager::new(config).await.unwrap();
let handle = network.handle().clone();
tokio::task::spawn(network);
// create networkeventstream to get the next session event easily.
let events = handle.event_listener();
let mut event_stream = NetworkEventStream::new(events);
handle.add_peer(*peer_handle.peer_id(), peer_handle.local_addr());
let added_peer_id = event_stream.peer_added().await.unwrap();
assert_eq!(added_peer_id, *peer_handle.peer_id());
let removed_peer_id = event_stream.peer_removed().await.unwrap();
assert_eq!(removed_peer_id, *peer_handle.peer_id());
}