diff --git a/crates/net/eth-wire/src/capability.rs b/crates/net/eth-wire/src/capability.rs index ddede16b3..3ee3c9803 100644 --- a/crates/net/eth-wire/src/capability.rs +++ b/crates/net/eth-wire/src/capability.rs @@ -72,6 +72,12 @@ impl Capabilities { self.inner } + /// Whether the peer supports `eth` sub-protocol. + #[inline] + pub fn supports_eth(&self) -> bool { + self.eth_67 || self.eth_66 + } + /// Whether this peer supports eth v66 protocol. #[inline] pub fn supports_eth_v66(&self) -> bool { diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index c5032b81f..a73f08b7d 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -1,7 +1,7 @@ use crate::{peers::PeersConfig, session::SessionsConfig}; use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_PORT}; use reth_eth_wire::forkid::ForkId; -use reth_primitives::Chain; +use reth_primitives::{Chain, H256}; use secp256k1::SecretKey; use std::{ net::{Ipv4Addr, SocketAddr, SocketAddrV4}, @@ -29,6 +29,8 @@ pub struct NetworkConfig { pub fork_id: Option, /// The id of the network pub chain: Chain, + /// Genesis hash of the network + pub genesis_hash: H256, } // === impl NetworkConfig === @@ -58,6 +60,7 @@ impl NetworkConfig { } /// Builder for [`NetworkConfig`](struct.NetworkConfig.html). +#[allow(missing_docs)] pub struct NetworkConfigBuilder { /// The client type that can interact with the chain. client: Arc, @@ -73,10 +76,9 @@ pub struct NetworkConfigBuilder { peers_config: Option, /// How to configure the sessions manager sessions_config: Option, - fork_id: Option, - chain: Chain, + genesis_hash: H256, } // === impl NetworkConfigBuilder === @@ -94,9 +96,16 @@ impl NetworkConfigBuilder { sessions_config: None, fork_id: None, chain: Chain::Named(reth_primitives::rpc::Chain::Mainnet), + genesis_hash: Default::default(), } } + /// Sets the genesis hash for the network. + pub fn genesis_hash(mut self, genesis_hash: H256) -> Self { + self.genesis_hash = genesis_hash; + self + } + /// Consumes the type and creates the actual [`NetworkConfig`] pub fn build(self) -> NetworkConfig { let Self { @@ -109,6 +118,7 @@ impl NetworkConfigBuilder { sessions_config, fork_id, chain, + genesis_hash, } = self; NetworkConfig { client, @@ -124,6 +134,7 @@ impl NetworkConfigBuilder { sessions_config: sessions_config.unwrap_or_default(), fork_id, chain, + genesis_hash, } } } diff --git a/crates/net/network/src/fetch.rs b/crates/net/network/src/fetch.rs index 3113c9703..572154daa 100644 --- a/crates/net/network/src/fetch.rs +++ b/crates/net/network/src/fetch.rs @@ -1,6 +1,9 @@ //! Fetch data from the network. -use crate::{message::RequestResult, NodeId}; +use crate::{ + message::{BlockRequest, RequestResult}, + NodeId, +}; use futures::StreamExt; use reth_eth_wire::{BlockBody, EthMessage}; use reth_interfaces::p2p::headers::client::HeadersRequest; @@ -34,13 +37,13 @@ pub struct StateFetcher { impl StateFetcher { /// Invoked when connected to a new peer. - pub(crate) fn new_connected_peer( - &mut self, - _node_id: NodeId, - _best_hash: H256, - _best_number: U256, - ) { - } + pub(crate) fn new_connected_peer(&mut self, _node_id: NodeId, _best_hash: H256) {} + + /// Invoked when an active session was closed. + pub(crate) fn on_session_closed(&mut self, _peer: &NodeId) {} + + /// Invoked when an active session is about to be disconnected. + pub(crate) fn on_pending_disconnect(&mut self, _peer: &NodeId) {} /// Returns the next action to return fn poll_action(&mut self) -> Option { @@ -94,9 +97,19 @@ impl StateFetcher { /// Called on a `GetBlockHeaders` response from a peer pub(crate) fn on_block_headers_response( &mut self, - _from: NodeId, - _msg: RequestResult>, - ) { + _peer: NodeId, + _res: RequestResult>, + ) -> Option { + None + } + + /// Called on a `GetBlockBodies` response from a peer + pub(crate) fn on_block_bodies_response( + &mut self, + _peer: NodeId, + _res: RequestResult>, + ) -> Option { + None } /// Returns a new [`HeadersDownloader`] that can send requests to this type @@ -184,3 +197,15 @@ pub(crate) enum FetchAction { request: EthMessage, }, } + +/// Outcome of a processed response. +/// +/// Returned after processing a response. +#[derive(Debug)] +pub(crate) enum BlockResponseOutcome { + /// Continue with another request to the peer. + Request(NodeId, BlockRequest), + /// How to handle a bad response + // TODO this should include some form of reputation change + BadResponse(NodeId), +} diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index f03dc5d89..9aa2b9290 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -111,6 +111,7 @@ where listener_addr, peers_config, sessions_config, + genesis_hash, .. } = config; @@ -125,7 +126,7 @@ where let local_node_id = discovery.local_id(); let sessions = SessionManager::new(secret_key, sessions_config); - let state = NetworkState::new(client, discovery, peers_manger); + let state = NetworkState::new(client, discovery, peers_manger, genesis_hash); let swarm = Swarm::new(incoming, sessions, state); diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 2ba0ce938..5c44ac170 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -5,14 +5,16 @@ use futures::FutureExt; use reth_eth_wire::{ - BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, - GetReceipts, NewBlock, NewBlockHashes, NodeData, PooledTransactions, Receipts, Transactions, + BlockBodies, BlockBody, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData, + GetPooledTransactions, GetReceipts, NewBlock, NewBlockHashes, NodeData, PooledTransactions, + Receipts, Transactions, }; use std::task::{ready, Context, Poll}; use crate::NodeId; use reth_eth_wire::capability::CapabilityMessage; -use tokio::sync::{mpsc, oneshot}; +use reth_primitives::{Header, Receipt, TransactionSigned}; +use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot}; /// Result alias for result of a request. pub type RequestResult = Result; @@ -58,6 +60,15 @@ pub enum PeerMessage { Other(CapabilityMessage), } +/// Request Variants that only target block related data. +#[derive(Debug, Clone)] +#[allow(missing_docs)] +#[allow(clippy::enum_variant_names)] +pub enum BlockRequest { + GetBlockHeaders(GetBlockHeaders), + GetBlockBodies(GetBlockBodies), +} + /// All Request variants of an [`EthMessage`] /// /// Note: These variants come without a request ID, as it's expected that the peer session will @@ -99,8 +110,8 @@ pub enum PeerRequest { /// /// The response should be sent through the channel. GetBlockBodies { - request: GetBlockHeaders, - response: oneshot::Sender>, + request: GetBlockBodies, + response: oneshot::Sender>, }, /// Request pooled transactions from the peer. /// @@ -133,12 +144,15 @@ pub enum PeerResponse { impl PeerResponse { /// Polls the type to completion. - pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { + pub(crate) fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { macro_rules! poll_request { ($response:ident, $item:ident, $cx:ident) => { match ready!($response.poll_unpin($cx)) { - Ok(res) => res.map(EthResponse::$item), - Err(err) => Err(err.into()), + Ok(res) => Ok(PeerResponseResult::$item(res.map(|item| item.0))), + Err(err) => Err(err), } }; } @@ -164,6 +178,32 @@ impl PeerResponse { } } +/// All response variants for [`PeerResponse`] +#[derive(Debug)] +#[allow(missing_docs)] +pub enum PeerResponseResult { + BlockHeaders(RequestResult>), + BlockBodies(RequestResult>), + PooledTransactions(RequestResult>), + NodeData(RequestResult>), + Receipts(RequestResult>>), +} + +// === impl PeerResponseResult === + +impl PeerResponseResult { + /// Returns whether this result is an error. + pub fn is_err(&self) -> bool { + match self { + PeerResponseResult::BlockHeaders(res) => res.is_err(), + PeerResponseResult::BlockBodies(res) => res.is_err(), + PeerResponseResult::PooledTransactions(res) => res.is_err(), + PeerResponseResult::NodeData(res) => res.is_err(), + PeerResponseResult::Receipts(res) => res.is_err(), + } + } +} + /// A Cloneable connection for sending _requests_ directly to the session of a peer. #[derive(Debug, Clone)] pub struct PeerRequestSender { @@ -172,3 +212,12 @@ pub struct PeerRequestSender { /// The Sender half connected to a session. pub(crate) to_session_tx: mpsc::Sender, } + +// === impl PeerRequestSender === + +impl PeerRequestSender { + /// Attempts to immediately send a message on this Sender + pub fn try_send(&self, req: PeerRequest) -> Result<(), TrySendError> { + self.to_session_tx.try_send(req) + } +} diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 12c1036ad..c8f787072 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -3,22 +3,26 @@ use crate::{ discovery::{Discovery, DiscoveryEvent}, fetch::StateFetcher, - message::{EthResponse, PeerRequestSender, PeerResponse}, + message::{PeerRequestSender, PeerResponse}, peers::{PeerAction, PeersManager}, NodeId, }; use reth_eth_wire::{capability::Capabilities, Status}; use reth_interfaces::provider::BlockProvider; -use reth_primitives::{H256, U256}; +use reth_primitives::H256; use std::{ collections::{HashMap, VecDeque}, net::SocketAddr, sync::Arc, task::{Context, Poll}, - time::Instant, }; +use tokio::sync::oneshot; +use crate::{ + fetch::BlockResponseOutcome, + message::{BlockRequest, PeerRequest, PeerResponseResult}, +}; use tracing::trace; /// The [`NetworkState`] keeps track of the state of all peers in the network. @@ -36,14 +40,14 @@ pub struct NetworkState { connected_peers: HashMap, /// Manages connections to peers. peers_manager: PeersManager, - /// Tracks the state of connected peers - peers_state: HashMap, /// Buffered messages until polled. queued_messages: VecDeque, /// The client type that can interact with the chain. client: Arc, /// Network discovery. discovery: Discovery, + /// The genesis hash of the network we're on + genesis_hash: H256, /// The type that handles requests. /// /// The fetcher streams RLPx related requests on a per-peer basis to this type. This type will @@ -56,31 +60,57 @@ where C: BlockProvider, { /// Create a new state instance with the given params - pub(crate) fn new(client: Arc, discovery: Discovery, peers_manager: PeersManager) -> Self { + pub(crate) fn new( + client: Arc, + discovery: Discovery, + peers_manager: PeersManager, + genesis_hash: H256, + ) -> Self { Self { connected_peers: Default::default(), peers_manager, - peers_state: Default::default(), queued_messages: Default::default(), client, discovery, + genesis_hash, state_fetcher: Default::default(), } } - /// Event hook for an authenticated session for the peer. - pub(crate) fn on_session_authenticated( + /// Event hook for an activated session for the peer. + /// + /// Returns `Ok` if the session is valid, returns an `Err` if the session is not accepted and + /// should be rejected. + pub(crate) fn on_session_activated( &mut self, - _node_id: NodeId, - _capabilities: Arc, - _status: Status, - _messages: PeerRequestSender, - ) { - // TODO notify fetecher as well + peer: NodeId, + capabilities: Arc, + status: Status, + request_tx: PeerRequestSender, + ) -> Result<(), AddSessionError> { + // TODO add capacity check + debug_assert!(self.connected_peers.contains_key(&peer), "Already connected; not possible"); + + self.state_fetcher.new_connected_peer(peer, status.blockhash); + + self.connected_peers.insert( + peer, + ConnectedPeer { + best_hash: status.blockhash, + capabilities, + request_tx, + pending_response: None, + }, + ); + + Ok(()) } /// Event hook for a disconnected session for the peer. - pub(crate) fn on_session_closed(&mut self, _node_id: NodeId) {} + pub(crate) fn on_session_closed(&mut self, peer: NodeId) { + self.connected_peers.remove(&peer); + self.state_fetcher.on_session_closed(&peer); + } /// Propagates Block to peers. pub(crate) fn announce_block(&mut self, _hash: H256, _block: ()) { @@ -103,21 +133,75 @@ where fn on_peer_action(&mut self, action: PeerAction) { match action { PeerAction::Connect { node_id, remote_addr } => { - self.peers_state.insert(node_id, PeerSessionState::Connecting); self.queued_messages.push_back(StateAction::Connect { node_id, remote_addr }); } PeerAction::Disconnect { node_id } => { - self.peers_state.remove(&node_id); + self.state_fetcher.on_pending_disconnect(&node_id); self.queued_messages.push_back(StateAction::Disconnect { node_id }); } } } /// Disconnect the session - fn disconnect_session(&mut self, _node: NodeId) {} + fn on_session_disconnected(&mut self, peer: NodeId) { + self.connected_peers.remove(&peer); + } + + /// Sends The message to the peer's session and queues in a response. + /// + /// Caution: this will replace an already pending response. It's the responsibility of the + /// caller to select the peer. + fn handle_block_request(&mut self, peer: NodeId, request: BlockRequest) { + if let Some(ref mut peer) = self.connected_peers.get_mut(&peer) { + let (request, response) = match request { + BlockRequest::GetBlockHeaders(request) => { + let (response, rx) = oneshot::channel(); + let request = PeerRequest::GetBlockHeaders { request, response }; + let response = PeerResponse::BlockHeaders { response: rx }; + (request, response) + } + BlockRequest::GetBlockBodies(request) => { + let (response, rx) = oneshot::channel(); + let request = PeerRequest::GetBlockBodies { request, response }; + let response = PeerResponse::BlockBodies { response: rx }; + (request, response) + } + }; + let _ = peer.request_tx.to_session_tx.try_send(request); + peer.pending_response = Some(response); + } + } + + /// Handle the outcome of processed response, for example directly queue another request. + fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) -> Option { + match outcome { + BlockResponseOutcome::Request(peer, request) => { + self.handle_block_request(peer, request); + } + BlockResponseOutcome::BadResponse(_) => { + // TODO handle reputation change + } + } + None + } /// Invoked when received a response from a connected peer. - fn on_eth_response(&mut self, _node: NodeId, _resp: EthResponse) {} + fn on_eth_response(&mut self, peer: NodeId, resp: PeerResponseResult) -> Option { + match resp { + PeerResponseResult::BlockHeaders(res) => { + let outcome = self.state_fetcher.on_block_headers_response(peer, res)?; + return self.on_block_response_outcome(outcome) + } + PeerResponseResult::BlockBodies(res) => { + let outcome = self.state_fetcher.on_block_bodies_response(peer, res)?; + return self.on_block_response_outcome(outcome) + } + PeerResponseResult::PooledTransactions(_) => {} + PeerResponseResult::NodeData(_) => {} + PeerResponseResult::Receipts(_) => {} + } + None + } /// Advances the state pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { @@ -137,7 +221,6 @@ where for (id, peer) in self.connected_peers.iter_mut() { if let Some(response) = peer.pending_response.as_mut() { match response.poll(cx) { - Poll::Ready(Ok(resp)) => received_responses.push((*id, resp)), Poll::Ready(Err(_)) => { trace!( ?id, @@ -146,6 +229,7 @@ where ); disconnect_sessions.push(*id); } + Poll::Ready(Ok(resp)) => received_responses.push((*id, resp)), Poll::Pending => continue, }; } @@ -155,11 +239,13 @@ where } for node in disconnect_sessions { - self.disconnect_session(node) + self.on_session_disconnected(node) } for (id, resp) in received_responses { - self.on_eth_response(id, resp); + if let Some(action) = self.on_eth_response(id, resp) { + self.queued_messages.push_back(action); + } } // poll peer manager @@ -180,34 +266,14 @@ where pub struct ConnectedPeer { /// Best block of the peer. pub(crate) best_hash: H256, - /// Best block number of the peer. - pub(crate) best_number: U256, - /// A communication channel directly to the session service. - pub(crate) message_tx: PeerRequestSender, + /// The capabilities of the connected peer. + pub(crate) capabilities: Arc, + /// A communication channel directly to the session task. + pub(crate) request_tx: PeerRequestSender, /// The response receiver for a currently active request to that peer. pub(crate) pending_response: Option, } -/// Tracks the current state of the peer session -pub enum PeerSessionState { - /// Starting state for outbound connections. - /// - /// This will be triggered by a [`PeerAction::Connect`] action. - /// The peer will reside in the state until the connection has been authenticated. - Connecting, - /// Established connection that hasn't been authenticated yet. - Incoming { - /// How long to keep this open. - until: Instant, - sender: PeerRequestSender, - }, - /// Node is connected to the peer and is ready to - Ready { - /// Communication channel directly to the session task - sender: PeerRequestSender, - }, -} - /// Message variants triggered by the [`State`] pub enum StateAction { /// Create a new connection to the given node. @@ -215,3 +281,12 @@ pub enum StateAction { /// Disconnect an existing connection Disconnect { node_id: NodeId }, } + +#[derive(Debug, thiserror::Error)] +pub enum AddSessionError { + #[error("No capacity for new sessions")] + AtCapacity { + /// The peer of the session + peer: NodeId, + }, +} diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 96dd6cfc8..e62195613 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -1,7 +1,7 @@ use crate::{ listener::{ConnectionListener, ListenerEvent}, session::{SessionEvent, SessionId, SessionManager}, - state::{NetworkState, StateAction}, + state::{AddSessionError, NetworkState, StateAction}, NodeId, }; use futures::Stream; @@ -68,10 +68,15 @@ where capabilities, status, messages, - } => { - self.state.on_session_authenticated(node_id, capabilities, status, messages); - Some(SwarmEvent::SessionEstablished { node_id, remote_addr }) - } + } => match self.state.on_session_activated(node_id, capabilities, status, messages) { + Ok(_) => Some(SwarmEvent::SessionEstablished { node_id, remote_addr }), + Err(err) => { + match err { + AddSessionError::AtCapacity { peer } => self.sessions.disconnect(peer), + }; + None + } + }, SessionEvent::ValidMessage { node_id, message } => { Some(SwarmEvent::CapabilityMessage { node_id, message }) }