From 7879389059b3cf7698c1cc70c4e0bb5f21e5f719 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 29 Nov 2022 20:57:40 +0100 Subject: [PATCH] chore(net): deny unused and cleanup (#289) --- crates/net/network/src/discovery.rs | 25 +- crates/net/network/src/fetch/mod.rs | 28 +- crates/net/network/src/lib.rs | 6 +- crates/net/network/src/manager.rs | 79 ++++-- crates/net/network/src/message.rs | 37 +-- crates/net/network/src/network.rs | 28 +- crates/net/network/src/peers/manager.rs | 29 +- crates/net/network/src/peers/reputation.rs | 10 - crates/net/network/src/session/active.rs | 7 +- crates/net/network/src/session/config.rs | 1 + crates/net/network/src/session/handle.rs | 6 +- crates/net/network/src/session/mod.rs | 315 ++++++++++----------- crates/net/network/src/state.rs | 19 +- crates/net/network/src/swarm.rs | 16 +- crates/net/network/src/transactions.rs | 32 ++- 15 files changed, 364 insertions(+), 274 deletions(-) diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index 469710033..7affd9eb7 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -24,11 +24,11 @@ pub struct Discovery { /// Local ENR of the discovery service. local_enr: NodeRecord, /// Handler to interact with the Discovery v4 service - discv4: Discv4, + _discv4: Discv4, /// All KAD table updates from the discv4 service. discv4_updates: ReceiverStream, /// The initial config for the discv4 service - dsicv4_config: Discv4Config, + _dsicv4_config: Discv4Config, /// Events buffered until polled. queued_events: VecDeque, /// The handle to the spawned discv4 service @@ -57,9 +57,9 @@ impl Discovery { Ok(Self { local_enr, - discv4, + _discv4: discv4, discv4_updates, - dsicv4_config, + _dsicv4_config: dsicv4_config, _discv4_service, discovered_nodes: Default::default(), queued_events: Default::default(), @@ -71,23 +71,6 @@ impl Discovery { self.local_enr.id } - /// Manually adds an address to the set. - /// - /// This has the same effect as adding node discovered via network gossip. - pub(crate) fn add_node_address(&mut self, peer_id: PeerId, addr: SocketAddr) { - self.on_discv4_update(TableUpdate::Added(NodeRecord { - address: addr.ip(), - tcp_port: addr.port(), - udp_port: addr.port(), - id: peer_id, - })) - } - - /// Returns all nodes we know exist in the network. - pub fn nodes(&mut self) -> &HashMap { - &self.discovered_nodes - } - fn on_discv4_update(&mut self, update: TableUpdate) { match update { TableUpdate::Added(node) => { diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index acd2e6e62..6ec0b7d99 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -2,7 +2,7 @@ use crate::message::BlockRequest; use futures::StreamExt; -use reth_eth_wire::{BlockBody, GetBlockBodies}; +use reth_eth_wire::{BlockBody, GetBlockBodies, GetBlockHeaders}; use reth_interfaces::p2p::{ error::{RequestError, RequestResult}, headers::client::HeadersRequest, @@ -161,18 +161,15 @@ impl StateFetcher { match req { DownloadRequest::GetBlockHeaders { request, response } => { - let inflight = Request { request, response }; + let inflight = Request { request: request.clone(), response }; self.inflight_headers_requests.insert(peer_id, inflight); - - unimplemented!("unify start types"); - - // BlockRequest::GetBlockHeaders(GetBlockHeaders { - // // TODO: this should be converted - // start_block: BlockHashOrNumber::Number(0), - // limit: request.limit, - // skip: 0, - // reverse: request.reverse, - // }) + let HeadersRequest { start, limit, reverse } = request; + BlockRequest::GetBlockHeaders(GetBlockHeaders { + start_block: start, + limit, + skip: 0, + reverse, + }) } DownloadRequest::GetBlockBodies { request, response } => { let inflight = Request { request: request.clone(), response }; @@ -307,9 +304,12 @@ impl PeerState { } } -/// A request that waits for a response from the network so it can send it back through the response -/// channel. +/// A request that waits for a response from the network, so it can send it back through the +/// response channel. struct Request { + /// The issued request object + /// TODO: this can be attached to the response in error case + #[allow(unused)] request: Req, response: oneshot::Sender, } diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index 00fd644c7..bf82409c9 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -4,8 +4,6 @@ no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] -// TODO remove later -#![allow(dead_code, clippy::too_many_arguments)] //! reth P2P networking. //! @@ -25,11 +23,11 @@ mod listener; mod manager; mod message; mod network; -mod peers; +pub mod peers; mod session; mod state; mod swarm; -mod transactions; +pub mod transactions; pub use config::NetworkConfig; pub use fetch::FetchClient; diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 7a266fe23..372a33ed5 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -23,11 +23,12 @@ use crate::{ listener::ConnectionListener, message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender}, network::{NetworkHandle, NetworkHandleMessage}, - peers::{PeersManager, ReputationChangeKind}, + peers::{PeersHandle, PeersManager, ReputationChangeKind}, session::SessionManager, state::NetworkState, swarm::{Swarm, SwarmEvent}, transactions::NetworkTransactionEvent, + FetchClient, }; use futures::{Future, StreamExt}; use parking_lot::Mutex; @@ -36,7 +37,7 @@ use reth_eth_wire::{ DisconnectReason, }; use reth_interfaces::provider::BlockProvider; -use reth_primitives::PeerId; +use reth_primitives::{PeerId, H256}; use std::{ net::SocketAddr, pin::Pin, @@ -48,7 +49,7 @@ use std::{ }; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{error, trace}; +use tracing::{error, trace, warn}; /// Manages the _entire_ state of the network. /// @@ -84,8 +85,6 @@ pub struct NetworkManager { from_handle_rx: UnboundedReceiverStream, /// Handles block imports according to the `eth` protocol. block_import: Box, - /// The address of this node that listens for incoming connections. - listener_address: Arc>, /// All listeners for [`Network`] events. event_listeners: NetworkEventListeners, /// Sender half to send events to the [`TransactionsManager`] task, if configured. @@ -142,7 +141,7 @@ where let num_active_peers = Arc::new(AtomicUsize::new(0)); let handle = NetworkHandle::new( Arc::clone(&num_active_peers), - Arc::clone(&listener_address), + listener_address, to_manager_tx, local_peer_id, peers_handle, @@ -154,7 +153,6 @@ where handle, from_handle_rx: UnboundedReceiverStream::new(from_handle_rx), block_import, - listener_address, event_listeners: Default::default(), to_transactions: None, num_active_peers, @@ -171,6 +169,11 @@ where self.swarm.listener().local_address() } + /// Returns the configured genesis hash + pub fn genesis_hash(&self) -> H256 { + self.swarm.state().genesis_hash() + } + /// How many peers we're currently connected to. pub fn num_connected_peers(&self) -> usize { self.swarm.state().num_connected_peers() @@ -188,6 +191,20 @@ where &self.handle } + /// Returns a new [`PeersHandle`] that can be cloned and shared. + /// + /// The [`PeersHandle`] can be used to interact with the network's peer set. + pub fn peers_handle(&self) -> PeersHandle { + self.swarm.state().peers().handle() + } + + /// Returns a new [`FetchClient`] that can be cloned and shared. + /// + /// The [`FetchClient`] is the entrypoint for sending requests to the network. + pub fn fetch_client(&self) -> FetchClient { + self.swarm.state().fetch_client() + } + /// Event hook for an unexpected message from the peer. fn on_invalid_message( &mut self, @@ -302,7 +319,9 @@ where PeerMessage::SendTransactions(_) => { unreachable!("Not emitted by session") } - PeerMessage::Other(_) => {} + PeerMessage::Other(other) => { + error!(target : "net", message_id=%other.id, "Ignoring unsupported message"); + } } } @@ -339,6 +358,9 @@ where NetworkHandleMessage::ReputationChange(peer_id, kind) => { self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind); } + NetworkHandleMessage::FetchClient(tx) => { + let _ = tx.send(self.fetch_client()); + } } } } @@ -387,11 +409,11 @@ where SwarmEvent::TcpListenerError(err) => { trace!(target : "net", ?err, "TCP connection error."); } - SwarmEvent::IncomingTcpConnection { remote_addr, .. } => { - trace!(target : "net",?remote_addr, "Incoming connection"); + SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => { + trace!(target : "net", ?session_id, ?remote_addr, "Incoming connection"); } - SwarmEvent::OutgoingTcpConnection { remote_addr } => { - trace!(target : "net", ?remote_addr,"Starting outbound connection."); + SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => { + trace!(target : "net", ?remote_addr, ?peer_id, "Starting outbound connection."); } SwarmEvent::SessionEstablished { peer_id, @@ -410,7 +432,10 @@ where ); if direction.is_incoming() { - this.swarm.state_mut().peers_mut().on_active_session(peer_id, remote_addr); + this.swarm + .state_mut() + .peers_mut() + .on_active_inbound_session(peer_id, remote_addr); } this.event_listeners.send(NetworkEvent::SessionEstablished { @@ -440,14 +465,36 @@ where this.event_listeners.send(NetworkEvent::SessionClosed { peer_id }); } - SwarmEvent::IncomingPendingSessionClosed { .. } => {} - SwarmEvent::OutgoingPendingSessionClosed { peer_id, .. } => { + SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { + warn!( + target : "net", + ?remote_addr, + ?error, + "Incoming pending session failed" + ); + this.swarm.state_mut().peers_mut().on_closed_incoming_pending_session() + } + SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => { + warn!( + target : "net", + ?remote_addr, + ?peer_id, + ?error, + "Outgoing pending session failed" + ); this.swarm .state_mut() .peers_mut() .apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect); } - SwarmEvent::OutgoingConnectionError { peer_id, .. } => { + SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => { + warn!( + target : "net", + ?remote_addr, + ?peer_id, + ?error, + "Outgoing connection error" + ); this.swarm .state_mut() .peers_mut() diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 35b0dd11b..2c7a31e24 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -5,7 +5,7 @@ use futures::FutureExt; use reth_eth_wire::{ - capability::CapabilityMessage, message::RequestPair, BlockBodies, BlockBody, BlockHeaders, + capability::RawCapabilityMessage, message::RequestPair, BlockBodies, BlockBody, BlockHeaders, EthMessage, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, SharedTransactions, Transactions, @@ -53,7 +53,8 @@ pub enum PeerMessage { /// All `eth` request variants. EthRequest(PeerRequest), /// Other than eth namespace message - Other(CapabilityMessage), + #[allow(unused)] + Other(RawCapabilityMessage), } /// Request Variants that only target block related data. @@ -65,32 +66,6 @@ pub enum BlockRequest { GetBlockBodies(GetBlockBodies), } -/// All Request variants of an [`EthMessage`] -/// -/// Note: These variants come without a request ID, as it's expected that the peer session will -/// manage those -#[derive(Debug, Clone)] -#[allow(missing_docs)] -#[allow(clippy::enum_variant_names)] -pub enum EthRequest { - GetBlockHeaders(GetBlockHeaders), - GetBlockBodies(GetBlockBodies), - GetPooledTransactions(GetPooledTransactions), - GetNodeData(GetNodeData), - GetReceipts(GetReceipts), -} - -/// Corresponding Response variants for [`EthRequest`] -#[derive(Debug, Clone)] -#[allow(missing_docs)] -pub enum EthResponse { - BlockHeaders(BlockHeaders), - BlockBodies(BlockBodies), - PooledTransactions(PooledTransactions), - NodeData(NodeData), - Receipts(Receipts), -} - /// Protocol related request messages that expect a response #[derive(Debug)] #[allow(clippy::enum_variant_names)] @@ -265,6 +240,7 @@ impl PeerResponseResult { } /// Returns whether this result is an error. + #[allow(unused)] pub fn is_err(&self) -> bool { match self { PeerResponseResult::BlockHeaders(res) => res.is_err(), @@ -292,4 +268,9 @@ impl PeerRequestSender { pub fn try_send(&self, req: PeerRequest) -> Result<(), TrySendError> { self.to_session_tx.try_send(req) } + + /// Returns the peer id of the remote peer. + pub fn peer_id(&self) -> &PeerId { + &self.peer_id + } } diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 3c2b0926b..e77f54828 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -3,6 +3,7 @@ use crate::{ manager::NetworkEvent, message::PeerRequest, peers::{PeersHandle, ReputationChangeKind}, + FetchClient, }; use parking_lot::Mutex; use reth_eth_wire::{NewBlock, NewPooledTransactionHashes, SharedTransactions}; @@ -14,7 +15,7 @@ use std::{ Arc, }, }; -use tokio::sync::{mpsc, mpsc::UnboundedSender}; +use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; /// A _shareable_ network frontend. Used to interact with the network. @@ -64,6 +65,13 @@ impl NetworkHandle { &self.inner.local_peer_id } + /// Returns the [`PeersHandle`] that can be cloned and shared. + /// + /// The [`PeersHandle`] can be used to interact with the network's peer set. + pub fn peers_handle(&self) -> &PeersHandle { + &self.inner.peers + } + fn manager(&self) -> &UnboundedSender { &self.inner.to_manager_tx } @@ -75,6 +83,15 @@ impl NetworkHandle { UnboundedReceiverStream::new(rx) } + /// Returns a new [`FetchClient`] that can be cloned and shared. + /// + /// The [`FetchClient`] is the entrypoint for sending requests to the network. + pub async fn fetch_client(&self) -> Result { + let (tx, rx) = oneshot::channel(); + let _ = self.manager().send(NetworkHandleMessage::FetchClient(tx)); + rx.await + } + /// Returns the mode of the network, either pow, or pos pub fn mode(&self) -> &NetworkMode { &self.inner.network_mode @@ -85,6 +102,13 @@ impl NetworkHandle { let _ = self.inner.to_manager_tx.send(msg); } + /// Announce a block over devp2p + /// + /// Caution: in PoS this is a noop, since new block propagation will happen over devp2p + pub fn announce_block(&self, block: NewBlock, hash: H256) { + self.send_message(NetworkHandleMessage::AnnounceBlock(block, hash)) + } + /// Sends a message to the [`NetworkManager`] to add a peer to the known set pub fn add_peer(&self, peer: PeerId, addr: SocketAddr) { let _ = self.inner.to_manager_tx.send(NetworkHandleMessage::AddPeerAddress(peer, addr)); @@ -154,4 +178,6 @@ pub(crate) enum NetworkHandleMessage { }, /// Apply a reputation change to the given peer. ReputationChange(PeerId, ReputationChangeKind), + /// Returns the client that can be used to interact with the network. + FetchClient(oneshot::Sender), } diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 5d60b68f2..6d4378fa8 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -90,13 +90,32 @@ impl PeersManager { PeersHandle { manager_tx: self.manager_tx.clone() } } + /// Invoked when a new _incoming_ tcp connection is accepted. + /// + /// Returns an Err with the configured limit with the number of accepted incoming connections + /// exceeded the configured limit. + pub(crate) fn on_inbound_pending_session(&mut self) -> Result<(), usize> { + if !self.connection_info.has_in_capacity() { + return Err(self.connection_info.max_inbound) + } + + // keep track of new connection + self.connection_info.inc_in(); + Ok(()) + } + + /// Invoked when a pending session was closed. + pub(crate) fn on_closed_incoming_pending_session(&mut self) { + self.connection_info.decr_in() + } + /// Called when a new _incoming_ active session was established to the given peer. /// /// This will update the state of the peer if not yet tracked. /// /// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will /// be scheduled. - pub(crate) fn on_active_session(&mut self, peer_id: PeerId, addr: SocketAddr) { + pub(crate) fn on_active_inbound_session(&mut self, peer_id: PeerId, addr: SocketAddr) { match self.peers.entry(peer_id) { Entry::Occupied(mut entry) => { let value = entry.get_mut(); @@ -110,9 +129,6 @@ impl PeersManager { entry.insert(Peer::with_state(addr, PeerConnectionState::In)); } } - - // keep track of new connection - self.connection_info.inc_in(); } /// Apply the corresponding reputation change to the given peer @@ -282,6 +298,11 @@ impl ConnectionInfo { self.num_outbound < self.max_outbound } + /// Returns `true` if there's still capacity for a new incoming connection. + fn has_in_capacity(&self) -> bool { + self.num_inbound < self.max_inbound + } + fn decr_state(&mut self, state: PeerConnectionState) { match state { PeerConnectionState::Idle => {} diff --git a/crates/net/network/src/peers/reputation.rs b/crates/net/network/src/peers/reputation.rs index ac1d0d6e8..9bfb93a98 100644 --- a/crates/net/network/src/peers/reputation.rs +++ b/crates/net/network/src/peers/reputation.rs @@ -106,16 +106,6 @@ pub(crate) struct ReputationChange(Reputation); // === impl ReputationChange === impl ReputationChange { - /// Apply no reputation change. - pub(crate) const fn none() -> Self { - Self(0) - } - - /// Reputation change for a peer that dropped the connection. - pub(crate) const fn dropped() -> Self { - Self(REMOTE_DISCONNECT_REPUTATION_CHANGE) - } - /// Helper type for easier conversion #[inline] pub(crate) fn as_i32(self) -> Reputation { diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 8f770d2f8..02f3301ed 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -42,6 +42,7 @@ use tracing::{error, warn}; /// - incoming commands from the [`SessionsManager`] /// - incoming requests via the request channel /// - responses for handled ETH requests received from the remote peer. +#[allow(unused)] pub(crate) struct ActiveSession { /// Keeps track of request ids. pub(crate) next_id: u64, @@ -216,7 +217,9 @@ impl ActiveSession { PeerMessage::ReceivedTransaction(_) => { unreachable!("Not emitted by network") } - PeerMessage::Other(_) => {} + PeerMessage::Other(other) => { + error!(target : "net::session", message_id=%other.id, "Ignoring unsupported message"); + } } } @@ -432,6 +435,7 @@ pub(crate) struct ReceivedRequest { /// Receiver half of the channel that's supposed to receive the proper response. rx: PeerResponse, /// Timestamp when we read this msg from the wire. + #[allow(unused)] received: Instant, } @@ -463,6 +467,7 @@ impl From for OutgoingMessage { #[cfg(test)] mod tests { + #![allow(dead_code)] use super::*; use crate::session::{ config::REQUEST_TIMEOUT, handle::PendingSessionEvent, start_pending_incoming_session, diff --git a/crates/net/network/src/session/config.rs b/crates/net/network/src/session/config.rs index f6ce7b757..2b96bdb48 100644 --- a/crates/net/network/src/session/config.rs +++ b/crates/net/network/src/session/config.rs @@ -155,6 +155,7 @@ impl SessionCounter { } } + #[allow(unused)] pub(crate) fn ensure_pending_outbound(&self) -> Result<(), ExceedsSessionLimit> { Self::ensure(self.pending_outbound, self.limits.max_pending_outbound) } diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs index 0aaa67244..1f2afcbaa 100644 --- a/crates/net/network/src/session/handle.rs +++ b/crates/net/network/src/session/handle.rs @@ -23,7 +23,7 @@ use tokio::{ #[derive(Debug)] pub(crate) struct PendingSessionHandle { /// Can be used to tell the session to disconnect the connection/abort the handshake process. - pub(crate) disconnect_tx: oneshot::Sender<()>, + pub(crate) _disconnect_tx: oneshot::Sender<()>, /// The direction of the session pub(crate) direction: Direction, } @@ -33,6 +33,7 @@ pub(crate) struct PendingSessionHandle { /// Within an active session that supports the `Ethereum Wire Protocol `, three high-level tasks can /// be performed: chain synchronization, block propagation and transaction exchange. #[derive(Debug)] +#[allow(unused)] pub(crate) struct ActiveSessionHandle { /// The direction of the session pub(crate) direction: Direction, @@ -65,8 +66,6 @@ impl ActiveSessionHandle { /// A session starts with a `Handshake`, followed by a `Hello` message which #[derive(Debug)] pub(crate) enum PendingSessionEvent { - /// Initial handshake step was successful - SuccessfulHandshake { remote_addr: SocketAddr, session_id: SessionId }, /// Represents a successful `Hello` and `Status` exchange: Established { session_id: SessionId, @@ -134,6 +133,7 @@ pub(crate) enum ActiveSessionMessage { message: PeerMessage, }, /// Received a message that does not match the announced capabilities of the peer. + #[allow(unused)] InvalidMessage { /// Identifier of the remote peer. peer_id: PeerId, diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 78bf01749..dc6bc699e 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -43,6 +43,7 @@ mod config; mod handle; use crate::session::config::SessionCounter; pub use config::SessionsConfig; +use reth_ecies::util::pk2id; /// Internal identifier for active sessions. #[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)] @@ -59,8 +60,6 @@ pub(crate) struct SessionManager { request_timeout: Duration, /// The secret key used for authenticating sessions. secret_key: SecretKey, - /// The node id of node - peer_id: PeerId, /// The `Status` message to send to peers. status: Status, /// THe `Hello` message to send to peers. @@ -105,7 +104,7 @@ impl SessionManager { let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer); let pk = secret_key.public_key(SECP256K1); - let peer_id = PeerId::from_slice(&pk.serialize_uncompressed()[1..]); + let peer_id = pk2id(&pk); // TODO: make sure this is the right place to put these builders - maybe per-Network rather // than per-Session? @@ -118,7 +117,6 @@ impl SessionManager { counter: SessionCounter::new(config.limits), request_timeout: config.request_timeout, secret_key, - peer_id, status, hello, fork_filter, @@ -181,7 +179,8 @@ impl SessionManager { self.fork_filter.clone(), )); - let handle = PendingSessionHandle { disconnect_tx, direction: Direction::Incoming }; + let handle = + PendingSessionHandle { _disconnect_tx: disconnect_tx, direction: Direction::Incoming }; self.pending_sessions.insert(session_id, handle); self.counter.inc_pending_inbound(); Ok(session_id) @@ -204,8 +203,10 @@ impl SessionManager { self.fork_filter.clone(), )); - let handle = - PendingSessionHandle { disconnect_tx, direction: Direction::Outgoing(remote_peer_id) }; + let handle = PendingSessionHandle { + _disconnect_tx: disconnect_tx, + direction: Direction::Outgoing(remote_peer_id), + }; self.pending_sessions.insert(session_id, handle); self.counter.inc_pending_outbound(); } @@ -289,160 +290,138 @@ impl SessionManager { } // Poll the pending session event stream - loop { - let event = match self.pending_session_rx.poll_next_unpin(cx) { - Poll::Pending => break, - Poll::Ready(None) => unreachable!("Manager holds both channel halves."), - Poll::Ready(Some(event)) => event, - }; - match event { - PendingSessionEvent::SuccessfulHandshake { remote_addr, session_id } => { - trace!( - target : "net::session", - ?session_id, - ?remote_addr, - "successful handshake" - ); - } - PendingSessionEvent::Established { - session_id, + let event = match self.pending_session_rx.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => unreachable!("Manager holds both channel halves."), + Poll::Ready(Some(event)) => event, + }; + return match event { + PendingSessionEvent::Established { + session_id, + remote_addr, + peer_id, + capabilities, + conn, + status, + direction, + } => { + // move from pending to established. + self.remove_pending_session(&session_id); + + let (commands_to_session, commands_rx) = mpsc::channel(self.session_command_buffer); + + let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer); + + let messages = PeerRequestSender { peer_id, to_session_tx }; + + let session = ActiveSession { + next_id: 0, + remote_peer_id: peer_id, remote_addr, - peer_id, - capabilities, + remote_capabilities: Arc::clone(&capabilities), + session_id, + commands_rx: ReceiverStream::new(commands_rx), + to_session: self.active_session_tx.clone(), + request_tx: ReceiverStream::new(messages_rx).fuse(), + inflight_requests: Default::default(), conn, - status, + queued_outgoing: Default::default(), + received_requests: Default::default(), + timeout_interval: tokio::time::interval(self.request_timeout), + request_timeout: self.request_timeout, + }; + + self.spawn(session); + + let handle = ActiveSessionHandle { direction, - } => { - // move from pending to established. - self.remove_pending_session(&session_id); + session_id, + remote_id: peer_id, + established: Instant::now(), + capabilities: Arc::clone(&capabilities), + commands_to_session, + }; - let (commands_to_session, commands_rx) = - mpsc::channel(self.session_command_buffer); + self.active_sessions.insert(peer_id, handle); + self.counter.inc_active(&direction); - let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer); - - let messages = PeerRequestSender { peer_id, to_session_tx }; - - let session = ActiveSession { - next_id: 0, - remote_peer_id: peer_id, - remote_addr, - remote_capabilities: Arc::clone(&capabilities), - session_id, - commands_rx: ReceiverStream::new(commands_rx), - to_session: self.active_session_tx.clone(), - request_tx: ReceiverStream::new(messages_rx).fuse(), - inflight_requests: Default::default(), - conn, - queued_outgoing: Default::default(), - received_requests: Default::default(), - timeout_interval: tokio::time::interval(self.request_timeout), - request_timeout: self.request_timeout, - }; - - self.spawn(session); - - let handle = ActiveSessionHandle { - direction, - session_id, - remote_id: peer_id, - established: Instant::now(), - capabilities: Arc::clone(&capabilities), - commands_to_session, - }; - - self.active_sessions.insert(peer_id, handle); - self.counter.inc_active(&direction); - - return Poll::Ready(SessionEvent::SessionEstablished { - peer_id, - remote_addr, - capabilities, - status, - messages, - direction, - }) - } - PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => { - trace!( - target : "net::session", - ?session_id, - ?remote_addr, - "disconnected pending session" - ); - self.remove_pending_session(&session_id); - return match direction { - Direction::Incoming => { - Poll::Ready(SessionEvent::IncomingPendingSessionClosed { - remote_addr, - error, - }) - } - Direction::Outgoing(peer_id) => { - Poll::Ready(SessionEvent::OutgoingPendingSessionClosed { - remote_addr, - peer_id, - error, - }) - } + Poll::Ready(SessionEvent::SessionEstablished { + peer_id, + remote_addr, + capabilities, + status, + messages, + direction, + }) + } + PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => { + trace!( + target : "net::session", + ?session_id, + ?remote_addr, + "disconnected pending session" + ); + self.remove_pending_session(&session_id); + match direction { + Direction::Incoming => { + Poll::Ready(SessionEvent::IncomingPendingSessionClosed { + remote_addr, + error, + }) + } + Direction::Outgoing(peer_id) => { + Poll::Ready(SessionEvent::OutgoingPendingSessionClosed { + remote_addr, + peer_id, + error, + }) } } - PendingSessionEvent::OutgoingConnectionError { - remote_addr, - session_id, - peer_id, - error, - } => { - trace!( - target : "net::session", - ?error, - ?session_id, - ?remote_addr, - ?peer_id, - "connection refused" - ); - self.remove_pending_session(&session_id); - return Poll::Ready(SessionEvent::OutgoingPendingSessionClosed { - remote_addr, - peer_id, - error: None, - }) - } - PendingSessionEvent::EciesAuthError { - remote_addr, - session_id, - error, - direction, - } => { - self.remove_pending_session(&session_id); - warn!( - target : "net::session", - ?error, - ?session_id, - ?remote_addr, - "ecies auth failed" - ); - self.remove_pending_session(&session_id); - return match direction { - Direction::Incoming => { - Poll::Ready(SessionEvent::IncomingPendingSessionClosed { - remote_addr, - error: None, - }) - } - Direction::Outgoing(peer_id) => { - Poll::Ready(SessionEvent::OutgoingPendingSessionClosed { - remote_addr, - peer_id, - error: None, - }) - } + } + PendingSessionEvent::OutgoingConnectionError { + remote_addr, + session_id, + peer_id, + error, + } => { + trace!( + target : "net::session", + ?error, + ?session_id, + ?remote_addr, + ?peer_id, + "connection refused" + ); + self.remove_pending_session(&session_id); + Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error }) + } + PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => { + self.remove_pending_session(&session_id); + warn!( + target : "net::session", + ?error, + ?session_id, + ?remote_addr, + "ecies auth failed" + ); + self.remove_pending_session(&session_id); + match direction { + Direction::Incoming => { + Poll::Ready(SessionEvent::IncomingPendingSessionClosed { + remote_addr, + error: None, + }) + } + Direction::Outgoing(peer_id) => { + Poll::Ready(SessionEvent::OutgoingPendingSessionClosed { + remote_addr, + peer_id, + error: None, + }) } } } } - - Poll::Pending } } @@ -501,6 +480,22 @@ pub(crate) enum SessionEvent { Disconnected { peer_id: PeerId, remote_addr: SocketAddr }, } +/// The direction of the connection. +#[derive(Debug, Copy, Clone)] +pub(crate) enum Direction { + /// Incoming connection. + Incoming, + /// Outgoing connection to a specific node. + Outgoing(PeerId), +} + +impl Direction { + /// Returns `true` if this an incoming connection. + pub(crate) fn is_incoming(&self) -> bool { + matches!(self, Direction::Incoming) + } +} + /// The error thrown when the max configured limit has been reached and no more connections are /// accepted. #[derive(Debug, Clone, thiserror::Error)] @@ -510,6 +505,7 @@ pub struct ExceedsSessionLimit(pub(crate) u32); /// Starts the authentication process for a connection initiated by a remote peer. /// /// This will wait for the _incoming_ handshake request and answer it. +#[allow(clippy::too_many_arguments)] pub(crate) async fn start_pending_incoming_session( disconnect_rx: oneshot::Receiver<()>, session_id: SessionId, @@ -538,6 +534,7 @@ pub(crate) async fn start_pending_incoming_session( /// Starts the authentication process for a connection initiated by a remote peer. #[instrument(skip_all, fields(%remote_addr, peer_id), target = "net")] +#[allow(clippy::too_many_arguments)] async fn start_pending_outbound_session( disconnect_rx: oneshot::Receiver<()>, events: mpsc::Sender, @@ -578,23 +575,8 @@ async fn start_pending_outbound_session( .await } -/// The direction of the connection. -#[derive(Debug, Copy, Clone)] -pub(crate) enum Direction { - /// Incoming connection. - Incoming, - /// Outgoing connection to a specific node. - Outgoing(PeerId), -} - -impl Direction { - /// Returns `true` if this an incoming connection. - pub(crate) fn is_incoming(&self) -> bool { - matches!(self, Direction::Incoming) - } -} - /// Authenticates a session +#[allow(clippy::too_many_arguments)] async fn authenticate( disconnect_rx: oneshot::Receiver<()>, events: mpsc::Sender, @@ -672,6 +654,7 @@ async fn authenticate( /// Authenticate the stream via handshake /// /// On Success return the authenticated stream as [`PendingSessionEvent`] +#[allow(clippy::too_many_arguments)] async fn authenticate_stream( stream: UnauthedP2PStream>, session_id: SessionId, diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 93b58e2f1..ea8d644c1 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -9,6 +9,7 @@ use crate::{ PeerResponseResult, }, peers::{PeerAction, PeersManager}, + FetchClient, }; use reth_eth_wire::{ capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status, @@ -85,6 +86,21 @@ where &mut self.peers_manager } + /// Returns access to the [`PeersManager`] + pub(crate) fn peers(&self) -> &PeersManager { + &self.peers_manager + } + + /// Returns a new [`FetchClient`] + pub(crate) fn fetch_client(&self) -> FetchClient { + self.state_fetcher.client() + } + + /// How many peers we're currently connected to. + pub fn genesis_hash(&self) -> H256 { + self.genesis_hash + } + /// How many peers we're currently connected to. pub fn num_connected_peers(&self) -> usize { self.connected_peers.len() @@ -381,10 +397,11 @@ where /// Tracks the state of a Peer. /// /// For example known blocks,so we can decide what to announce. -pub struct ConnectedPeer { +pub(crate) struct ConnectedPeer { /// Best block of the peer. pub(crate) best_hash: H256, /// The capabilities of the connected peer. + #[allow(unused)] pub(crate) capabilities: Arc, /// A communication channel directly to the session task. pub(crate) request_tx: PeerRequestSender, diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 49abfa7d5..2ee2d52ba 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -19,7 +19,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tracing::warn; +use tracing::{trace, warn}; /// Contains the connectivity related state of the network. /// @@ -99,6 +99,7 @@ where pub(crate) fn listener(&self) -> &ConnectionListener { &self.incoming } + /// Mutable access to the [`SessionManager`]. pub(crate) fn sessions_mut(&mut self) -> &mut SessionManager { &mut self.sessions @@ -171,12 +172,19 @@ where return Some(SwarmEvent::TcpListenerClosed { remote_addr: address }) } ListenerEvent::Incoming { stream, remote_addr } => { + if let Err(limit) = self.state_mut().peers_mut().on_inbound_pending_session() { + // We currently don't have additional capacity to establish a session from an + // incoming connection, so we drop the connection. + trace!(target: "net", %limit, ?remote_addr, "Exceeded incoming connection limit; dropping connection"); + return None + } + match self.sessions.on_incoming(stream, remote_addr) { Ok(session_id) => { return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr }) } Err(err) => { - warn!(?err, "Incoming connection rejected"); + warn!(target: "net", ?err, "Incoming connection rejected"); } } } @@ -188,7 +196,8 @@ where fn on_state_action(&mut self, event: StateAction) -> Option { match event { StateAction::Connect { remote_addr, peer_id } => { - self.sessions.dial_outbound(remote_addr, peer_id); + self.dial_outbound(remote_addr, peer_id); + return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id }) } StateAction::Disconnect { peer_id, reason } => { self.sessions.disconnect(peer_id, reason); @@ -301,6 +310,7 @@ pub(crate) enum SwarmEvent { /// An outbound connection is initiated. OutgoingTcpConnection { /// Address of the remote peer. + peer_id: PeerId, remote_addr: SocketAddr, }, SessionEstablished { diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index adfb05f82..8070a3dee 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -43,6 +43,19 @@ pub struct TransactionsHandle { manager_tx: mpsc::UnboundedSender, } +// === impl TransactionsHandle === + +impl TransactionsHandle { + fn send(&self, cmd: TransactionsCommand) { + let _ = self.manager_tx.send(cmd); + } + + /// Manually propagate the transaction that belongs to the hash. + pub fn propagate(&self, hash: TxHash) { + self.send(TransactionsCommand::PropagateHash(hash)) + } +} + /// Manages transactions on top of the p2p network. /// /// This can be spawned to another task and is supposed to be run as background service while @@ -248,6 +261,15 @@ where } } + /// Handles a command received from a detached [`TransactionsHandle`] + fn on_command(&mut self, cmd: TransactionsCommand) { + match cmd { + TransactionsCommand::PropagateHash(hash) => { + self.on_new_transactions(std::iter::once(hash)) + } + } + } + /// Handles a received event related to common network events. fn on_network_event(&mut self, event: NetworkEvent) { match event { @@ -355,6 +377,11 @@ where this.on_network_event(event); } + // drain network/peer related events + while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) { + this.on_command(cmd); + } + // drain incoming transaction events while let Poll::Ready(Some(event)) = this.transaction_events.poll_next_unpin(cx) { this.on_network_tx_event(event); @@ -424,11 +451,12 @@ struct Peer { /// Commands to send to the [`TransactionManager`] enum TransactionsCommand { - Propagate(H256), + PropagateHash(H256), } -/// All events related to transactions emitted by the network +/// All events related to transactions emitted by the network. #[derive(Debug)] +#[allow(missing_docs)] pub enum NetworkTransactionEvent { /// Received list of transactions to the given peer. IncomingTransactions { peer_id: PeerId, msg: Transactions },