refactor: split NetworkEventListenerProvider (#12972)

This commit is contained in:
Léa Narzis
2024-12-04 19:27:58 +01:00
committed by GitHub
parent 0daa456f3a
commit fbd2d6eeda
15 changed files with 346 additions and 254 deletions

View File

@ -44,7 +44,9 @@ use reth_eth_wire::{
use reth_fs_util::{self as fs, FsPathError};
use reth_metrics::common::mpsc::UnboundedMeteredSender;
use reth_network_api::{
test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
events::{PeerEvent, SessionInfo},
test_utils::PeersHandle,
EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::ReputationChangeKind;
@ -712,24 +714,26 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
self.update_active_connection_metrics();
self.event_sender.notify(NetworkEvent::SessionEstablished {
let session_info = SessionInfo {
peer_id,
remote_addr,
client_version,
capabilities,
version,
status,
messages,
});
version,
};
self.event_sender
.notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
}
SwarmEvent::PeerAdded(peer_id) => {
trace!(target: "net", ?peer_id, "Peer added");
self.event_sender.notify(NetworkEvent::PeerAdded(peer_id));
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::PeerRemoved(peer_id) => {
trace!(target: "net", ?peer_id, "Peer dropped");
self.event_sender.notify(NetworkEvent::PeerRemoved(peer_id));
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
@ -772,7 +776,8 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
.saturating_sub(1)
as f64,
);
self.event_sender.notify(NetworkEvent::SessionClosed { peer_id, reason });
self.event_sender
.notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
}
SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
trace!(

View File

@ -4,6 +4,7 @@ use crate::{
};
use alloy_primitives::B256;
use enr::Enr;
use futures::StreamExt;
use parking_lot::Mutex;
use reth_discv4::{Discv4, NatResolver};
use reth_discv5::Discv5;
@ -13,6 +14,7 @@ use reth_eth_wire::{
};
use reth_ethereum_forks::Head;
use reth_network_api::{
events::{NetworkPeersEvents, PeerEvent, PeerEventStream},
test_utils::{PeersHandle, PeersHandleProvider},
BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent,
NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers,
@ -192,6 +194,17 @@ impl<N: NetworkPrimitives> NetworkHandle<N> {
// === API Implementations ===
impl<N: NetworkPrimitives> NetworkPeersEvents for NetworkHandle<N> {
/// Returns an event stream of peer-specific network events.
fn peer_events(&self) -> PeerEventStream {
let peer_events = self.inner.event_sender.new_listener().map(|event| match event {
NetworkEvent::Peer(peer_event) => peer_event,
NetworkEvent::ActivePeerSession { info, .. } => PeerEvent::SessionEstablished(info),
});
PeerEventStream::new(peer_events)
}
}
impl NetworkEventListenerProvider for NetworkHandle<EthNetworkPrimitives> {
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<EthNetworkPrimitives>>> {
self.inner.event_sender.new_listener()

View File

@ -13,6 +13,7 @@ use pin_project::pin_project;
use reth_chainspec::{Hardforks, MAINNET};
use reth_eth_wire::{protocol::Protocol, DisconnectReason, HelloMessageWithProtocols};
use reth_network_api::{
events::{PeerEvent, SessionInfo},
test_utils::{PeersHandle, PeersHandleProvider},
NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
};
@ -641,7 +642,9 @@ impl NetworkEventStream {
pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
while let Some(ev) = self.inner.next().await {
match ev {
NetworkEvent::SessionClosed { peer_id, reason } => return Some((peer_id, reason)),
NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) => {
return Some((peer_id, reason))
}
_ => continue,
}
}
@ -652,7 +655,10 @@ impl NetworkEventStream {
pub async fn next_session_established(&mut self) -> Option<PeerId> {
while let Some(ev) = self.inner.next().await {
match ev {
NetworkEvent::SessionEstablished { peer_id, .. } => return Some(peer_id),
NetworkEvent::ActivePeerSession { info, .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
return Some(info.peer_id)
}
_ => continue,
}
}
@ -667,7 +673,7 @@ impl NetworkEventStream {
let mut peers = Vec::with_capacity(num);
while let Some(ev) = self.inner.next().await {
match ev {
NetworkEvent::SessionEstablished { peer_id, .. } => {
NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } => {
peers.push(peer_id);
num -= 1;
if num == 0 {
@ -680,18 +686,24 @@ impl NetworkEventStream {
peers
}
/// Ensures that the first two events are a [`NetworkEvent::PeerAdded`] and
/// [`NetworkEvent::SessionEstablished`], returning the [`PeerId`] of the established
/// Ensures that the first two events are a [`NetworkEvent::Peer(PeerEvent::PeerAdded`] and
/// [`NetworkEvent::ActivePeerSession`], returning the [`PeerId`] of the established
/// session.
pub async fn peer_added_and_established(&mut self) -> Option<PeerId> {
let peer_id = match self.inner.next().await {
Some(NetworkEvent::PeerAdded(peer_id)) => peer_id,
Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
_ => return None,
};
match self.inner.next().await {
Some(NetworkEvent::SessionEstablished { peer_id: peer_id2, .. }) => {
debug_assert_eq!(peer_id, peer_id2, "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}");
Some(NetworkEvent::ActivePeerSession {
info: SessionInfo { peer_id: peer_id2, .. },
..
}) => {
debug_assert_eq!(
peer_id, peer_id2,
"PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"
);
Some(peer_id)
}
_ => None,

View File

@ -40,6 +40,7 @@ use reth_eth_wire::{
};
use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
use reth_network_api::{
events::{PeerEvent, SessionInfo},
NetworkEvent, NetworkEventListenerProvider, PeerRequest, PeerRequestSender, Peers,
};
use reth_network_p2p::{
@ -1050,55 +1051,81 @@ where
}
}
/// Handles session establishment and peer transactions initialization.
fn handle_peer_session(
&mut self,
info: SessionInfo,
messages: PeerRequestSender<PeerRequest<N>>,
) {
let SessionInfo { peer_id, client_version, version, .. } = info;
// Insert a new peer into the peerset.
let peer = PeerMetadata::<N>::new(
messages,
version,
client_version,
self.config.max_transactions_seen_by_peer_history,
);
let peer = match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
entry.insert(peer);
entry.into_mut()
}
Entry::Vacant(entry) => entry.insert(peer),
};
// Send a `NewPooledTransactionHashes` to the peer with up to
// `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
// transactions in the pool.
if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
return
}
// Get transactions to broadcast
let pooled_txs = self.pool.pooled_transactions_max(
SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
);
if pooled_txs.is_empty() {
trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
return;
}
// Build and send transaction hashes message
let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
for pooled_tx in pooled_txs {
peer.seen_transactions.insert(*pooled_tx.hash());
msg_builder.push_pooled(pooled_tx);
}
debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
let msg = msg_builder.build();
self.network.send_transactions_hashes(peer_id, msg);
}
/// Handles a received event related to common network events.
fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
match event_result {
NetworkEvent::SessionClosed { peer_id, .. } => {
NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
// remove the peer
self.peers.remove(&peer_id);
self.transaction_fetcher.remove_peer(&peer_id);
}
NetworkEvent::SessionEstablished {
peer_id, client_version, messages, version, ..
} => {
// Insert a new peer into the peerset.
let peer = PeerMetadata::new(
messages,
version,
client_version,
self.config.max_transactions_seen_by_peer_history,
);
let peer = match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
entry.insert(peer);
entry.into_mut()
NetworkEvent::ActivePeerSession { info, messages } => {
// process active peer session and broadcast available transaction from the pool
self.handle_peer_session(info, messages);
}
NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
let peer_id = info.peer_id;
// get messages from existing peer
let messages = match self.peers.get(&peer_id) {
Some(p) => p.request_tx.clone(),
None => {
debug!(target: "net::tx", ?peer_id, "No peer request sender found");
return;
}
Entry::Vacant(entry) => entry.insert(peer),
};
// Send a `NewPooledTransactionHashes` to the peer with up to
// `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
// transactions in the pool.
if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
return
}
let pooled_txs = self.pool.pooled_transactions_max(
SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
);
if pooled_txs.is_empty() {
// do not send a message if there are no transactions in the pool
return
}
let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
for pooled_tx in pooled_txs {
peer.seen_transactions.insert(*pooled_tx.hash());
msg_builder.push_pooled(pooled_tx);
}
let msg = msg_builder.build();
self.network.send_transactions_hashes(peer_id, msg);
self.handle_peer_session(info, messages);
}
_ => {}
}
@ -1987,27 +2014,12 @@ mod tests {
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => {
NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
// to insert a new peer in transactions peerset
transactions.on_network_event(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
})
transactions
.on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
}
NetworkEvent::PeerAdded(_peer_id) => continue,
NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
ev => {
error!("unexpected event {ev:?}")
}
@ -2073,28 +2085,13 @@ mod tests {
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => {
NetworkEvent::ActivePeerSession { .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
// to insert a new peer in transactions peerset
transactions.on_network_event(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
})
transactions.on_network_event(ev);
}
NetworkEvent::PeerAdded(_peer_id) => continue,
ev => {
NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
_ => {
error!("unexpected event {ev:?}")
}
}
@ -2157,27 +2154,12 @@ mod tests {
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => {
NetworkEvent::ActivePeerSession { .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
// to insert a new peer in transactions peerset
transactions.on_network_event(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
})
transactions.on_network_event(ev);
}
NetworkEvent::PeerAdded(_peer_id) => continue,
NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
ev => {
error!("unexpected event {ev:?}")
}
@ -2248,24 +2230,11 @@ mod tests {
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => transactions.on_network_event(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
}),
NetworkEvent::PeerAdded(_peer_id) => continue,
NetworkEvent::ActivePeerSession { .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
transactions.on_network_event(ev);
}
NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
ev => {
error!("unexpected event {ev:?}")
}
@ -2495,17 +2464,18 @@ mod tests {
network.handle().update_sync_state(SyncState::Idle);
// mock a peer
let (tx, _rx) = mpsc::channel(1);
tx_manager.on_network_event(NetworkEvent::SessionEstablished {
let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
let session_info = SessionInfo {
peer_id,
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
client_version: Arc::from(""),
capabilities: Arc::new(vec![].into()),
messages: PeerRequestSender::new(peer_id, tx),
status: Arc::new(Default::default()),
version: EthVersion::Eth68,
});
};
let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
tx_manager
.on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
let mut propagate = vec![];
let mut factory = MockTransactionFactory::default();
let eip1559_tx = Arc::new(factory.create_eip1559());

View File

@ -15,7 +15,10 @@ use reth_network::{
BlockDownloaderProvider, NetworkConfigBuilder, NetworkEvent, NetworkEventListenerProvider,
NetworkManager, PeersConfig,
};
use reth_network_api::{NetworkInfo, Peers, PeersInfo};
use reth_network_api::{
events::{PeerEvent, SessionInfo},
NetworkInfo, Peers, PeersInfo,
};
use reth_network_p2p::{
headers::client::{HeadersClient, HeadersRequest},
sync::{NetworkSyncUpdater, SyncState},
@ -59,13 +62,15 @@ async fn test_establish_connections() {
let mut established = listener0.take(4);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionClosed { .. } | NetworkEvent::PeerRemoved(_) => {
NetworkEvent::Peer(PeerEvent::SessionClosed { .. } | PeerEvent::PeerRemoved(_)) => {
panic!("unexpected event")
}
NetworkEvent::SessionEstablished { peer_id, .. } => {
assert!(expected_connections.remove(&peer_id))
NetworkEvent::ActivePeerSession { info, .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
let SessionInfo { peer_id, .. } = info;
assert!(expected_connections.remove(&peer_id));
}
NetworkEvent::PeerAdded(peer_id) => {
NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => {
assert!(expected_peers.remove(&peer_id))
}
}
@ -496,11 +501,16 @@ async fn test_geth_disconnect() {
handle.add_peer(geth_peer_id, geth_socket);
match events.next().await {
Some(NetworkEvent::PeerAdded(peer_id)) => assert_eq!(peer_id, geth_peer_id),
Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => {
assert_eq!(peer_id, geth_peer_id)
}
_ => panic!("Expected a peer added event"),
}
if let Some(NetworkEvent::SessionEstablished { peer_id, .. }) = events.next().await {
if let Some(NetworkEvent::Peer(PeerEvent::SessionEstablished(session_info))) =
events.next().await
{
let SessionInfo { peer_id, .. } = session_info;
assert_eq!(peer_id, geth_peer_id);
} else {
panic!("Expected a session established event");
@ -510,7 +520,9 @@ async fn test_geth_disconnect() {
handle.disconnect_peer(geth_peer_id);
// wait for a disconnect from geth
if let Some(NetworkEvent::SessionClosed { peer_id, .. }) = events.next().await {
if let Some(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. })) =
events.next().await
{
assert_eq!(peer_id, geth_peer_id);
} else {
panic!("Expected a session closed event");

View File

@ -6,7 +6,10 @@ use reth_network::{
test_utils::{PeerConfig, Testnet},
NetworkEvent, NetworkEventListenerProvider,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_network_api::{
events::{PeerEvent, SessionInfo},
NetworkInfo, Peers,
};
use reth_provider::test_utils::NoopProvider;
#[tokio::test(flavor = "multi_thread")]
@ -28,10 +31,11 @@ async fn test_session_established_with_highest_version() {
while let Some(event) = events.next().await {
match event {
NetworkEvent::PeerAdded(peer_id) => {
NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => {
assert_eq!(handle1.peer_id(), &peer_id);
}
NetworkEvent::SessionEstablished { peer_id, status, .. } => {
NetworkEvent::ActivePeerSession { info, .. } => {
let SessionInfo { peer_id, status, .. } = info;
assert_eq!(handle1.peer_id(), &peer_id);
assert_eq!(status.version, EthVersion::Eth68);
}
@ -66,10 +70,11 @@ async fn test_session_established_with_different_capability() {
while let Some(event) = events.next().await {
match event {
NetworkEvent::PeerAdded(peer_id) => {
NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => {
assert_eq!(handle1.peer_id(), &peer_id);
}
NetworkEvent::SessionEstablished { peer_id, status, .. } => {
NetworkEvent::ActivePeerSession { info, .. } => {
let SessionInfo { peer_id, status, .. } = info;
assert_eq!(handle1.peer_id(), &peer_id);
assert_eq!(status.version, EthVersion::Eth66);
}

View File

@ -7,7 +7,7 @@ use alloy_primitives::{PrimitiveSignature as Signature, U256};
use futures::StreamExt;
use rand::thread_rng;
use reth_network::{test_utils::Testnet, NetworkEvent, NetworkEventListenerProvider};
use reth_network_api::PeersInfo;
use reth_network_api::{events::PeerEvent, PeersInfo};
use reth_primitives::TransactionSigned;
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction, TransactionPool};
@ -139,16 +139,17 @@ async fn test_sending_invalid_transactions() {
// await disconnect for bad tx spam
if let Some(ev) = peer1_events.next().await {
match ev {
NetworkEvent::SessionClosed { peer_id, .. } => {
NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
assert_eq!(peer_id, *peer0.peer_id());
}
NetworkEvent::SessionEstablished { .. } => {
NetworkEvent::ActivePeerSession { .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished { .. }) => {
panic!("unexpected SessionEstablished event")
}
NetworkEvent::PeerAdded(_) => {
NetworkEvent::Peer(PeerEvent::PeerAdded(_)) => {
panic!("unexpected PeerAdded event")
}
NetworkEvent::PeerRemoved(_) => {
NetworkEvent::Peer(PeerEvent::PeerRemoved(_)) => {
panic!("unexpected PeerRemoved event")
}
}