diff --git a/Cargo.lock b/Cargo.lock index 6a6dc2c52..c077783af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2155,6 +2155,15 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linked_hash_set" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47186c6da4d81ca383c7c47c1bfc80f4b95f4720514d860a5407aaf4233f9588" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "lock_api" version = "0.4.9" @@ -3302,6 +3311,7 @@ dependencies = [ "either", "fnv", "futures", + "linked_hash_set", "parking_lot 0.12.1", "pin-project", "rand 0.8.5", diff --git a/crates/interfaces/src/p2p/error.rs b/crates/interfaces/src/p2p/error.rs index 856e5f65e..d7debf954 100644 --- a/crates/interfaces/src/p2p/error.rs +++ b/crates/interfaces/src/p2p/error.rs @@ -11,6 +11,8 @@ pub enum RequestError { ChannelClosed, #[error("Not connected to the peer.")] NotConnected, + #[error("Connection to a peer dropped while handling the request.")] + ConnectionDropped, #[error("Capability Message is not supported by remote peer.")] UnsupportedCapability, #[error("Request timed out while awaiting response.")] diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index dfe555aea..7a1a7043a 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -35,6 +35,7 @@ parking_lot = "0.12" async-trait = "0.1" bytes = "1.2" either = "1.8" +linked_hash_set = "0.1" secp256k1 = { version = "0.24", features = [ "global-context", diff --git a/crates/net/network/src/cache.rs b/crates/net/network/src/cache.rs new file mode 100644 index 000000000..96bdff23a --- /dev/null +++ b/crates/net/network/src/cache.rs @@ -0,0 +1,58 @@ +use linked_hash_set::LinkedHashSet; +use std::{borrow::Borrow, hash::Hash, num::NonZeroUsize}; + +/// A minimal LRU cache based on a `LinkedHashSet` with limited capacity. +/// +/// If the length exceeds the set capacity, the oldest element will be removed +/// In the limit, for each element inserted the oldest existing element will be removed. +#[derive(Debug, Clone)] +pub struct LruCache { + limit: NonZeroUsize, + inner: LinkedHashSet, +} + +impl LruCache { + /// Creates a new `LruCache` using the given limit + pub fn new(limit: NonZeroUsize) -> Self { + Self { inner: LinkedHashSet::new(), limit } + } + + /// Insert an element into the set. + /// + /// If the element is new (did not exist before [`LruCache::insert()`]) was called, then the + /// given length will be enforced and the oldest element will be removed if the limit was + /// exceeded. + /// + /// If the set did not have this value present, true is returned. + /// If the set did have this value present, false is returned. + pub fn insert(&mut self, entry: T) -> bool { + if self.inner.insert(entry) { + if self.limit.get() == self.inner.len() { + // remove the oldest element in the set + self.inner.pop_front(); + } + return true + } + false + } + + /// Returns `true` if the set contains a value. + pub fn contains(&self, value: &Q) -> bool + where + T: Borrow, + Q: Hash + Eq, + { + self.inner.contains(value) + } +} + +impl Extend for LruCache +where + T: Eq + Hash, +{ + fn extend>(&mut self, iter: I) { + for item in iter.into_iter() { + self.insert(item); + } + } +} diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index 4eb0641ce..a999019e2 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -1,4 +1,8 @@ -use crate::{peers::PeersConfig, session::SessionsConfig}; +use crate::{ + import::{BlockImport, NoopBlockImport}, + peers::PeersConfig, + session::SessionsConfig, +}; use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_PORT}; use reth_primitives::{Chain, ForkId, H256}; use secp256k1::SecretKey; @@ -30,6 +34,8 @@ pub struct NetworkConfig { pub chain: Chain, /// Genesis hash of the network pub genesis_hash: H256, + /// The block importer type. + pub block_import: Box, } // === impl NetworkConfig === @@ -82,6 +88,8 @@ pub struct NetworkConfigBuilder { chain: Chain, /// Network genesis hash genesis_hash: H256, + /// The block importer type. + block_import: Box, } // === impl NetworkConfigBuilder === @@ -100,6 +108,7 @@ impl NetworkConfigBuilder { fork_id: None, chain: Chain::Named(reth_primitives::rpc::Chain::Mainnet), genesis_hash: Default::default(), + block_import: Box::::default(), } } @@ -109,6 +118,12 @@ impl NetworkConfigBuilder { self } + /// Sets the [`BlockImport`] type to configure. + pub fn block_import(mut self, block_import: T) -> Self { + self.block_import = Box::new(block_import); + self + } + /// Consumes the type and creates the actual [`NetworkConfig`] pub fn build(self) -> NetworkConfig { let Self { @@ -122,6 +137,7 @@ impl NetworkConfigBuilder { fork_id, chain, genesis_hash, + block_import, } = self; NetworkConfig { client, @@ -138,6 +154,7 @@ impl NetworkConfigBuilder { fork_id, chain, genesis_hash, + block_import, } } } diff --git a/crates/net/network/src/fetch.rs b/crates/net/network/src/fetch.rs index c870b6149..151051b76 100644 --- a/crates/net/network/src/fetch.rs +++ b/crates/net/network/src/fetch.rs @@ -1,10 +1,13 @@ //! Fetch data from the network. -use crate::message::BlockRequest; +use crate::{message::BlockRequest, peers::ReputationChange}; use futures::StreamExt; -use reth_eth_wire::{BlockBody, EthMessage}; -use reth_interfaces::p2p::{error::RequestResult, headers::client::HeadersRequest}; -use reth_primitives::{Header, PeerId, H256, U256}; +use reth_eth_wire::{BlockBody, GetBlockBodies}; +use reth_interfaces::p2p::{ + error::{RequestError, RequestResult}, + headers::client::HeadersRequest, +}; +use reth_primitives::{Header, PeerId, H256}; use std::{ collections::{HashMap, VecDeque}, task::{Context, Poll}, @@ -20,6 +23,8 @@ use tokio_stream::wrappers::UnboundedReceiverStream; pub struct StateFetcher { /// Currently active [`GetBlockHeaders`] requests inflight_headers_requests: HashMap>>>, + /// Currently active [`GetBlockBodies`] requests + inflight_bodies_requests: HashMap, RequestResult>>>, /// The list of available peers for requests. peers: HashMap, /// Requests queued for processing @@ -34,26 +39,55 @@ pub struct StateFetcher { impl StateFetcher { /// Invoked when connected to a new peer. - pub(crate) fn new_connected_peer(&mut self, _node_id: PeerId, _best_hash: H256) {} + pub(crate) fn new_connected_peer( + &mut self, + peer_id: PeerId, + best_hash: H256, + best_number: Option, + ) { + self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number }); + } /// Invoked when an active session was closed. - pub(crate) fn on_session_closed(&mut self, _peer: &PeerId) {} + /// + /// This cancels als inflight request and sends an error to the receiver. + pub(crate) fn on_session_closed(&mut self, peer: &PeerId) { + self.peers.remove(peer); + if let Some(req) = self.inflight_headers_requests.remove(peer) { + let _ = req.response.send(Err(RequestError::ConnectionDropped)); + } + if let Some(req) = self.inflight_bodies_requests.remove(peer) { + let _ = req.response.send(Err(RequestError::ConnectionDropped)); + } + } /// Invoked when an active session is about to be disconnected. - pub(crate) fn on_pending_disconnect(&mut self, _peer: &PeerId) {} + pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) { + if let Some(peer) = self.peers.get_mut(peer_id) { + peer.state = PeerState::Closing; + } + } + + /// Returns the next idle peer that's ready to accept a request + fn next_peer(&mut self) -> Option<(&PeerId, &mut Peer)> { + self.peers.iter_mut().find(|(_, peer)| peer.state.is_idle()) + } /// Returns the next action to return fn poll_action(&mut self) -> Option { - // TODO find matching peers + if self.queued_requests.is_empty() { + return None + } - // if let Some(request) = self.queued_requests.pop_front() { - // if let Some(action) = self.on_download_request(request) { - // return Poll::Ready(action) - // } - // } - None + let peer_id = *self.next_peer()?.0; + + let request = self.queued_requests.pop_front().expect("not empty; qed"); + let request = self.prepare_block_request(peer_id, request); + + Some(FetchAction::BlockRequest { peer_id, request }) } + /// Received a request via a downloader fn on_download_request(&mut self, request: DownloadRequest) -> Option { match request { DownloadRequest::GetBlockHeaders { request: _, response: _ } => {} @@ -91,21 +125,79 @@ impl StateFetcher { Poll::Pending } + /// Handles a new request to a peer. + /// + /// Caution: this assumes the peer exists and is idle + fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest) -> BlockRequest { + // update the peer's state + if let Some(peer) = self.peers.get_mut(&peer_id) { + peer.state = req.peer_state(); + } + + let started = Instant::now(); + match req { + DownloadRequest::GetBlockHeaders { request, response } => { + let inflight = Request { request, response, started }; + self.inflight_headers_requests.insert(peer_id, inflight); + + unimplemented!("unify start types"); + + // BlockRequest::GetBlockHeaders(GetBlockHeaders { + // // TODO: this should be converted + // start_block: BlockHashOrNumber::Number(0), + // limit: request.limit, + // skip: 0, + // reverse: request.reverse, + // }) + } + DownloadRequest::GetBlockBodies { request, response } => { + let inflight = Request { request: request.clone(), response, started }; + self.inflight_bodies_requests.insert(peer_id, inflight); + BlockRequest::GetBlockBodies(GetBlockBodies(request)) + } + } + } + + /// Returns a new followup request for the peer. + /// + /// Caution: this expects that the peer is _not_ closed + fn followup_request(&mut self, peer_id: PeerId) -> Option { + let req = self.queued_requests.pop_front()?; + let req = self.prepare_block_request(peer_id, req); + Some(BlockResponseOutcome::Request(peer_id, req)) + } + /// Called on a `GetBlockHeaders` response from a peer pub(crate) fn on_block_headers_response( &mut self, - _peer: PeerId, - _res: RequestResult>, + peer_id: PeerId, + res: RequestResult>, ) -> Option { + if let Some(resp) = self.inflight_headers_requests.remove(&peer_id) { + let _ = resp.response.send(res); + } + if let Some(peer) = self.peers.get_mut(&peer_id) { + if peer.state.on_request_finished() { + return self.followup_request(peer_id) + } + } None } /// Called on a `GetBlockBodies` response from a peer pub(crate) fn on_block_bodies_response( &mut self, - _peer: PeerId, - _res: RequestResult>, + peer_id: PeerId, + res: RequestResult>, ) -> Option { + if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) { + let _ = resp.response.send(res); + } + if let Some(peer) = self.peers.get_mut(&peer_id) { + if peer.state.on_request_finished() { + return self.followup_request(peer_id) + } + } None } @@ -120,6 +212,7 @@ impl Default for StateFetcher { let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel(); Self { inflight_headers_requests: Default::default(), + inflight_bodies_requests: Default::default(), peers: Default::default(), queued_requests: Default::default(), download_requests_rx: UnboundedReceiverStream::new(download_requests_rx), @@ -148,14 +241,12 @@ impl HeadersDownloader { /// Represents a connected peer struct Peer { - /// Identifier for requests. - request_id: u64, /// The state this peer currently resides in. state: PeerState, /// Best known hash that the peer has best_hash: H256, - /// Best known number the peer has. - best_number: U256, + /// Tracks the best number of the peer. + best_number: Option, } /// Tracks the state of an individual peer @@ -164,6 +255,32 @@ enum PeerState { Idle, /// Peer is handling a `GetBlockHeaders` request. GetBlockHeaders, + /// Peer is handling a `GetBlockBodies` request. + GetBlockBodies, + /// Peer session is about to close + Closing, +} + +// === impl PeerState === + +impl PeerState { + /// Returns true if the peer is currently idle. + fn is_idle(&self) -> bool { + matches!(self, PeerState::Idle) + } + + /// Resets the state on a received response. + /// + /// If the state was already marked as `Closing` do nothing. + /// + /// Returns `true` if the peer is ready for another request. + fn on_request_finished(&mut self) -> bool { + if !matches!(self, PeerState::Closing) { + *self = PeerState::Idle; + return true + } + false + } } /// A request that waits for a response from the network so it can send it back through the response @@ -185,13 +302,26 @@ enum DownloadRequest { GetBlockBodies { request: Vec, response: oneshot::Sender>> }, } +// === impl DownloadRequest === + +impl DownloadRequest { + /// Returns the corresponding state for a peer that handles the request. + fn peer_state(&self) -> PeerState { + match self { + DownloadRequest::GetBlockHeaders { .. } => PeerState::GetBlockHeaders, + DownloadRequest::GetBlockBodies { .. } => PeerState::GetBlockBodies, + } + } +} + /// An action the syncer can emit. pub(crate) enum FetchAction { /// Dispatch an eth request to the given peer. - EthRequest { - node_id: PeerId, + BlockRequest { + /// The targeted recipient for the request + peer_id: PeerId, /// The request to send - request: EthMessage, + request: BlockRequest, }, } @@ -202,7 +332,6 @@ pub(crate) enum FetchAction { pub(crate) enum BlockResponseOutcome { /// Continue with another request to the peer. Request(PeerId, BlockRequest), - /// How to handle a bad response - // TODO this should include some form of reputation change - BadResponse(PeerId), + /// How to handle a bad response and the reputation change to apply. + BadResponse(PeerId, ReputationChange), } diff --git a/crates/net/network/src/import.rs b/crates/net/network/src/import.rs new file mode 100644 index 000000000..3a54d76dd --- /dev/null +++ b/crates/net/network/src/import.rs @@ -0,0 +1,42 @@ +use crate::message::NewBlockMessage; +use reth_primitives::PeerId; +use std::task::{Context, Poll}; + +/// Abstraction over block import. +pub trait BlockImport: Send + Sync { + /// Invoked for a received `NewBlock` broadcast message from the peer. + /// + /// > When a `NewBlock` announcement message is received from a peer, the client first verifies + /// > the basic header validity of the block, checking whether the proof-of-work value is valid. + /// + /// This is supposed to start verification. The results are then expected to be returned via + /// [`BlockImport::poll`]. + fn on_new_block(&mut self, peer_id: PeerId, incoming_block: NewBlockMessage); + + /// Returns the results of a [`BlockImport::on_new_block`] + fn poll(&mut self, cx: &mut Context<'_>) -> Poll; +} + +/// Outcome of the [`BlockImport`]'s block handling. +pub struct BlockImportOutcome { + /// Sender of the `NewBlock` message. + pub peer: PeerId, + /// The result after validating the block + pub result: Result, +} + +/// Represents the error case of a failed block import +pub enum BlockImportError {} + +/// An implementation of `BlockImport` that does nothing +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct NoopBlockImport; + +impl BlockImport for NoopBlockImport { + fn on_new_block(&mut self, _peer_id: PeerId, _incoming_block: NewBlockMessage) {} + + fn poll(&mut self, _cx: &mut Context<'_>) -> Poll { + Poll::Pending + } +} diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index 45b1c7097..831f1a897 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -15,10 +15,12 @@ //! port of that network. This includes public identities (public key) and addresses (where to reach //! them). +mod cache; mod config; mod discovery; pub mod error; mod fetch; +mod import; mod listener; mod manager; mod message; diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 340efe113..0c5bc208e 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -19,7 +19,9 @@ use crate::{ config::NetworkConfig, discovery::Discovery, error::NetworkError, + import::{BlockImport, BlockImportOutcome}, listener::ConnectionListener, + message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender}, network::{NetworkHandle, NetworkHandleMessage}, peers::PeersManager, session::SessionManager, @@ -30,9 +32,9 @@ use futures::{Future, StreamExt}; use parking_lot::Mutex; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, - EthMessage, + GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions, }; -use reth_interfaces::provider::BlockProvider; +use reth_interfaces::{p2p::error::RequestResult, provider::BlockProvider}; use reth_primitives::PeerId; use std::{ net::SocketAddr, @@ -43,7 +45,7 @@ use std::{ }, task::{Context, Poll}, }; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{error, trace}; @@ -77,8 +79,8 @@ pub struct NetworkManager { handle: NetworkHandle, /// Receiver half of the command channel set up between this type and the [`NetworkHandle`] from_handle_rx: UnboundedReceiverStream, - /// Handles block imports. - block_import_sink: (), + /// Handles block imports according to the `eth` protocol. + block_import: Box, /// The address of this node that listens for incoming connections. listener_address: Arc>, /// All listeners for [`Network`] events. @@ -112,6 +114,7 @@ where peers_config, sessions_config, genesis_hash, + block_import, .. } = config; @@ -145,7 +148,7 @@ where swarm, handle, from_handle_rx: UnboundedReceiverStream::new(from_handle_rx), - block_import_sink: (), + block_import, listener_address, event_listeners: Default::default(), num_active_peers, @@ -171,41 +174,47 @@ where // TODO: disconnect? } - /// Handles a received [`CapabilityMessage`] from the peer. - fn on_capability_message(&mut self, _node_id: PeerId, msg: CapabilityMessage) { - match msg { - CapabilityMessage::Eth(eth) => { - match eth { - EthMessage::Status(_) => {} - EthMessage::NewBlockHashes(_) => { - // update peer's state, to track what blocks this peer has seen - } - EthMessage::NewBlock(_) => { - // emit new block and track that the peer knows this block - } - EthMessage::Transactions(_) => { - // need to emit this as event/send to tx handler - } - EthMessage::NewPooledTransactionHashes(_) => { - // need to emit this as event/send to tx handler - } + /// Handle an incoming request from the peer + fn on_eth_request(&mut self, peer_id: PeerId, req: PeerRequest) { + match req { + PeerRequest::GetBlockHeaders { .. } => {} + PeerRequest::GetBlockBodies { .. } => {} + PeerRequest::GetPooledTransactions { request, response } => { + // notify listeners about this request + self.event_listeners.send(NetworkEvent::GetPooledTransactions { + peer_id, + request, + response: Arc::new(response), + }); + } + PeerRequest::GetNodeData { .. } => {} + PeerRequest::GetReceipts { .. } => {} + } + } - // TODO: should remove the response types here, as they are handled separately - EthMessage::GetBlockHeaders(_) => {} - EthMessage::BlockHeaders(_) => {} - EthMessage::GetBlockBodies(_) => {} - EthMessage::BlockBodies(_) => {} - EthMessage::GetPooledTransactions(_) => {} - EthMessage::PooledTransactions(_) => {} - EthMessage::GetNodeData(_) => {} - EthMessage::NodeData(_) => {} - EthMessage::GetReceipts(_) => {} - EthMessage::Receipts(_) => {} - } + /// Handles a received Message from the peer. + fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage) { + match msg { + PeerMessage::NewBlockHashes(hashes) => { + // update peer's state, to track what blocks this peer has seen + self.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0) } - CapabilityMessage::Other(_) => { - // other subprotocols + PeerMessage::NewBlock(block) => { + self.swarm.state_mut().on_new_block(peer_id, block.hash); + // start block import process + self.block_import.on_new_block(peer_id, block); } + PeerMessage::PooledTransactions(msg) => { + self.event_listeners + .send(NetworkEvent::IncomingPooledTransactionHashes { peer_id, msg }); + } + PeerMessage::Transactions(msg) => { + self.event_listeners.send(NetworkEvent::IncomingTransactions { peer_id, msg }); + } + PeerMessage::EthRequest(req) => { + self.on_eth_request(peer_id, req); + } + PeerMessage::Other(_) => {} } } @@ -215,10 +224,25 @@ where NetworkHandleMessage::EventListener(tx) => { self.event_listeners.listeners.push(tx); } - NetworkHandleMessage::NewestBlock(_, _) => {} - _ => {} + NetworkHandleMessage::AnnounceBlock(block, hash) => { + let msg = NewBlockMessage { hash, block: Arc::new(block) }; + self.swarm.state_mut().announce_new_block(msg); + } + NetworkHandleMessage::EthRequest { peer_id, request } => { + self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request)) + } + NetworkHandleMessage::SendTransaction { peer_id, msg } => { + self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::Transactions(msg)) + } + NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self + .swarm + .sessions_mut() + .send_message(&peer_id, PeerMessage::PooledTransactions(msg)), } } + + /// Invoked after a `NewBlock` message from the peer was validated + fn on_block_import_result(&mut self, _outcome: BlockImportOutcome) {} } impl Future for NetworkManager @@ -230,6 +254,11 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); + // poll new block imports + while let Poll::Ready(outcome) = this.block_import.poll(cx) { + this.on_block_import_result(outcome); + } + // process incoming messages from a handle loop { match this.from_handle_rx.poll_next_unpin(cx) { @@ -248,8 +277,8 @@ where while let Poll::Ready(Some(event)) = this.swarm.poll_next_unpin(cx) { // handle event match event { - SwarmEvent::CapabilityMessage { node_id, message } => { - this.on_capability_message(node_id, message) + SwarmEvent::ValidMessage { node_id, message } => { + this.on_peer_message(node_id, message) } SwarmEvent::InvalidCapabilityMessage { node_id, capabilities, message } => { this.on_invalid_message(node_id, capabilities, message) @@ -266,25 +295,38 @@ where SwarmEvent::OutgoingTcpConnection { remote_addr } => { trace!(?remote_addr, target = "net", "Starting outbound connection."); } - SwarmEvent::SessionEstablished { node_id, remote_addr } => { + SwarmEvent::SessionEstablished { + node_id: peer_id, + remote_addr, + capabilities, + messages, + } => { let total_active = this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; trace!( ?remote_addr, - ?node_id, + ?peer_id, ?total_active, target = "net", "Session established" ); + + this.event_listeners.send(NetworkEvent::SessionEstablished { + peer_id, + capabilities, + messages, + }); } - SwarmEvent::SessionClosed { node_id, remote_addr } => { + SwarmEvent::SessionClosed { node_id: peer_id, remote_addr } => { let total_active = this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1; trace!( ?remote_addr, - ?node_id, + ?peer_id, ?total_active, target = "net", "Session disconnected" ); + + this.event_listeners.send(NetworkEvent::SessionClosed { peer_id }); } SwarmEvent::IncomingPendingSessionClosed { .. } => {} SwarmEvent::OutgoingPendingSessionClosed { .. } => {} @@ -292,14 +334,33 @@ where } } - todo!() + Poll::Pending } } /// Events emitted by the network that are of interest for subscribers. +/// +/// This includes any event types that may be relevant to tasks #[derive(Debug, Clone)] pub enum NetworkEvent { - EthMessage { node_id: PeerId, message: EthMessage }, + /// Closed the peer session. + SessionClosed { peer_id: PeerId }, + /// Established a new session with the given peer. + SessionEstablished { + peer_id: PeerId, + capabilities: Arc, + messages: PeerRequestSender, + }, + /// Received list of transactions to the given peer. + IncomingTransactions { peer_id: PeerId, msg: Arc }, + /// Received list of transactions hashes to the given peer. + IncomingPooledTransactionHashes { peer_id: PeerId, msg: Arc }, + /// Incoming `GetPooledTransactions` request from a peer. + GetPooledTransactions { + peer_id: PeerId, + request: GetPooledTransactions, + response: Arc>>, + }, } /// Bundles all listeners for [`NetworkEvent`]s. diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 3aaaca3cc..20abe492e 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -5,26 +5,38 @@ use futures::FutureExt; use reth_eth_wire::{ - BlockBodies, BlockBody, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData, - GetPooledTransactions, GetReceipts, NewBlock, NewBlockHashes, NodeData, PooledTransactions, - Receipts, Transactions, + capability::CapabilityMessage, BlockBodies, BlockBody, BlockHeaders, GetBlockBodies, + GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewBlockHashes, + NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, Transactions, }; -use std::task::{ready, Context, Poll}; - -use reth_eth_wire::capability::CapabilityMessage; use reth_interfaces::p2p::error::RequestResult; -use reth_primitives::{Header, PeerId, Receipt, TransactionSigned}; +use reth_primitives::{Header, PeerId, Receipt, TransactionSigned, H256}; +use std::{ + sync::Arc, + task::{ready, Context, Poll}, +}; use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot}; +/// Internal form of a `NewBlock` message +#[derive(Debug, Clone)] +pub struct NewBlockMessage { + /// Hash of the block + pub hash: H256, + /// Raw received message + pub block: Arc, +} + /// Represents all messages that can be sent to a peer session #[derive(Debug)] pub enum PeerMessage { /// Announce new block hashes NewBlockHashes(NewBlockHashes), /// Broadcast new block. - NewBlock(Box), + NewBlock(NewBlockMessage), /// Broadcast transactions. - Transactions(Transactions), + Transactions(Arc), + /// + PooledTransactions(Arc), /// All `eth` request variants. EthRequest(PeerRequest), /// Other than eth namespace message diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index c38986e94..9c8f079de 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -1,6 +1,7 @@ -use crate::{manager::NetworkEvent, peers::PeersHandle}; +use crate::{manager::NetworkEvent, message::PeerRequest, peers::PeersHandle}; use parking_lot::Mutex; -use reth_primitives::{PeerId, H256, U256}; +use reth_eth_wire::{NewBlock, NewPooledTransactionHashes, Transactions}; +use reth_primitives::{PeerId, H256}; use std::{ net::SocketAddr, sync::{atomic::AtomicUsize, Arc}, @@ -47,6 +48,16 @@ impl NetworkHandle { let _ = self.manager().send(NetworkHandleMessage::EventListener(tx)); rx } + + /// Sends a [`NetworkHandleMessage`] to the manager + fn send_message(&self, msg: NetworkHandleMessage) { + let _ = self.inner.to_manager_tx.send(msg); + } + + /// Sends a [`PeerRequest`] to the given peer's session. + pub fn send_request(&mut self, peer_id: PeerId, request: PeerRequest) { + self.send_message(NetworkHandleMessage::EthRequest { peer_id, request }) + } } struct NetworkInner { @@ -59,15 +70,25 @@ struct NetworkInner { /// The identifier used by this node. local_node_id: PeerId, /// Access to the all the nodes - peers: PeersHandle, // TODO need something to access + peers: PeersHandle, } /// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager). +#[allow(missing_docs)] pub(crate) enum NetworkHandleMessage { /// Add a new listener for [`NetworkEvent`]. EventListener(UnboundedSender), /// Broadcast event to announce a new block to all nodes. - AnnounceBlock, - /// Returns the newest imported block by the network. - NewestBlock(H256, U256), + AnnounceBlock(NewBlock, H256), + /// Sends the list of transactions to the given peer. + SendTransaction { peer_id: PeerId, msg: Arc }, + /// Sends the list of transactions hashes to the given peer. + SendPooledTransactionHashes { peer_id: PeerId, msg: Arc }, + /// Send an `eth` protocol request to the peer. + EthRequest { + /// The peer to send the request to. + peer_id: PeerId, + /// The request to send to the peer's sessions. + request: PeerRequest, + }, } diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs index f7cd579ca..6a9248e3d 100644 --- a/crates/net/network/src/session/handle.rs +++ b/crates/net/network/src/session/handle.rs @@ -1,5 +1,8 @@ //! Session handles -use crate::session::{Direction, SessionId}; +use crate::{ + message::PeerMessage, + session::{Direction, SessionId}, +}; use reth_ecies::{stream::ECIESStream, ECIESError}; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, @@ -93,7 +96,8 @@ pub(crate) enum PendingSessionEvent { pub(crate) enum SessionCommand { /// Disconnect the connection Disconnect, - Message(CapabilityMessage), + /// Sends a message to the peer + Message(PeerMessage), } /// Message variants an active session can produce and send back to the @@ -107,7 +111,7 @@ pub(crate) enum ActiveSessionMessage { /// Identifier of the remote peer. node_id: PeerId, /// Message received from the peer. - message: CapabilityMessage, + message: PeerMessage, }, /// Received a message that does not match the announced capabilities of the peer. InvalidMessage { diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index c3faecba2..5c6657422 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -1,9 +1,13 @@ //! Support for handling peer sessions. pub use crate::message::PeerRequestSender; -use crate::session::{ - active::ActiveSession, - handle::{ - ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, +use crate::{ + message::PeerMessage, + session::{ + active::ActiveSession, + handle::{ + ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, + SessionCommand, + }, }, }; use fnv::FnvHashMap; @@ -194,6 +198,13 @@ impl SessionManager { } } + /// Sends a message to the peer's session + pub(crate) fn send_message(&mut self, peer_id: &PeerId, msg: PeerMessage) { + if let Some(session) = self.active_sessions.get_mut(peer_id) { + let _ = session.commands_to_session.try_send(SessionCommand::Message(msg)); + } + } + /// This polls all the session handles and returns [`SessionEvent`]. /// /// Active sessions are prioritized. @@ -406,7 +417,7 @@ pub(crate) enum SessionEvent { ValidMessage { node_id: PeerId, /// Message received from the peer. - message: CapabilityMessage, + message: PeerMessage, }, /// Received a message that does not match the announced capabilities of the peer. InvalidMessage { diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index b2404c084..38645098a 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -1,29 +1,31 @@ //! Keeps track of the state of the network. use crate::{ + cache::LruCache, discovery::{Discovery, DiscoveryEvent}, - fetch::StateFetcher, - message::{PeerRequestSender, PeerResponse}, + fetch::{BlockResponseOutcome, StateFetcher}, + message::{ + BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse, + PeerResponseResult, + }, peers::{PeerAction, PeersManager}, }; - -use reth_eth_wire::{capability::Capabilities, Status}; +use reth_eth_wire::{capability::Capabilities, BlockHashNumber, Status}; use reth_interfaces::provider::BlockProvider; use reth_primitives::{PeerId, H256}; use std::{ collections::{HashMap, VecDeque}, net::SocketAddr, + num::NonZeroUsize, sync::Arc, task::{Context, Poll}, }; use tokio::sync::oneshot; - -use crate::{ - fetch::BlockResponseOutcome, - message::{BlockRequest, PeerRequest, PeerResponseResult}, -}; use tracing::trace; +/// Cache limit of blocks to keep track of for a single peer. +const PEER_BLOCK_CACHE_LIMIT: usize = 512; + /// The [`NetworkState`] keeps track of the state of all peers in the network. /// /// This includes: @@ -90,7 +92,9 @@ where // TODO add capacity check debug_assert!(self.connected_peers.contains_key(&peer), "Already connected; not possible"); - self.state_fetcher.new_connected_peer(peer, status.blockhash); + // find the corresponding block number + let block_number = self.client.block_number(status.blockhash).ok().flatten(); + self.state_fetcher.new_connected_peer(peer, status.blockhash, block_number); self.connected_peers.insert( peer, @@ -99,6 +103,7 @@ where capabilities, request_tx, pending_response: None, + blocks: LruCache::new(NonZeroUsize::new(PEER_BLOCK_CACHE_LIMIT).unwrap()), }, ); @@ -111,12 +116,59 @@ where self.state_fetcher.on_session_closed(&peer); } - /// Propagates Block to peers. - pub(crate) fn announce_block(&mut self, _hash: H256, _block: ()) { - // TODO propagate the newblock messages to all connected peers that haven't seen the block - // yet + /// Starts propagating the new block to peers that haven't reported the block yet. + /// + /// This is supposed to be invoked after the block was validated. + /// + /// > It then sends the block to a small fraction of connected peers (usually the square root of + /// > the total number of peers) using the `NewBlock` message. + /// + /// See also + pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage) { + // send a `NewBlock` message to a fraction fo the connected peers (square root of the total + // number of peers) + let num_propagate = (self.connected_peers.len() as f64).sqrt() as u64 + 1; - todo!() + let mut count = 0; + for (peer_id, peer) in self.connected_peers.iter_mut() { + if peer.blocks.contains(&msg.hash) { + // skip peers which already reported the block + continue + } + + // Queue a `NewBlock` message for the peer + if count < num_propagate { + self.queued_messages + .push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() }); + + // mark the block as seen by the peer + peer.blocks.insert(msg.hash); + + count += 1; + } + + if count >= num_propagate { + break + } + } + } + + /// Invoked after a `NewBlock` message was received by the peer. + /// + /// This will keep track of blocks we know a peer has + pub(crate) fn on_new_block(&mut self, peer_id: PeerId, hash: H256) { + // Mark the blocks as seen + if let Some(peer) = self.connected_peers.get_mut(&peer_id) { + peer.blocks.insert(hash); + } + } + + /// Invoked for a `NewBlockHashes` broadcast message. + pub(crate) fn on_new_block_hashes(&mut self, peer_id: PeerId, hashes: Vec) { + // Mark the blocks as seen + if let Some(peer) = self.connected_peers.get_mut(&peer_id) { + peer.blocks.extend(hashes.into_iter().map(|b| b.hash)); + } } /// Event hook for events received from the discovery service. @@ -183,7 +235,7 @@ where BlockResponseOutcome::Request(peer, request) => { self.handle_block_request(peer, request); } - BlockResponseOutcome::BadResponse(_) => { + BlockResponseOutcome::BadResponse(_peer, _reputation_change) => { // TODO handle reputation change } } @@ -277,10 +329,19 @@ pub struct ConnectedPeer { pub(crate) request_tx: PeerRequestSender, /// The response receiver for a currently active request to that peer. pub(crate) pending_response: Option, + /// Blocks we know the peer has. + pub(crate) blocks: LruCache, } /// Message variants triggered by the [`State`] pub enum StateAction { + /// Dispatch a `NewBlock` message to the peer + NewBlock { + /// Target of the message + peer_id: PeerId, + /// The `NewBlock` message + block: NewBlockMessage, + }, /// Create a new connection to the given node. Connect { remote_addr: SocketAddr, node_id: PeerId }, /// Disconnect an existing connection diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 136f11abd..9acb9a86f 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -1,5 +1,6 @@ use crate::{ listener::{ConnectionListener, ListenerEvent}, + message::{PeerMessage, PeerRequestSender}, session::{SessionEvent, SessionId, SessionManager}, state::{AddSessionError, NetworkState, StateAction}, }; @@ -56,6 +57,11 @@ where &mut self.state } + /// Mutable access to the [`SessionManager`]. + pub(crate) fn sessions_mut(&mut self) -> &mut SessionManager { + &mut self.sessions + } + /// Triggers a new outgoing connection to the given node pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) { self.sessions.dial_outbound(remote_addr, remote_id) @@ -70,8 +76,18 @@ where capabilities, status, messages, - } => match self.state.on_session_activated(node_id, capabilities, status, messages) { - Ok(_) => Some(SwarmEvent::SessionEstablished { node_id, remote_addr }), + } => match self.state.on_session_activated( + node_id, + capabilities.clone(), + status, + messages.clone(), + ) { + Ok(_) => Some(SwarmEvent::SessionEstablished { + node_id, + remote_addr, + capabilities, + messages, + }), Err(err) => { match err { AddSessionError::AtCapacity { peer } => self.sessions.disconnect(peer), @@ -80,7 +96,7 @@ where } }, SessionEvent::ValidMessage { node_id, message } => { - Some(SwarmEvent::CapabilityMessage { node_id, message }) + Some(SwarmEvent::ValidMessage { node_id, message }) } SessionEvent::InvalidMessage { node_id, capabilities, message } => { Some(SwarmEvent::InvalidCapabilityMessage { node_id, capabilities, message }) @@ -133,6 +149,10 @@ where StateAction::Disconnect { node_id } => { self.sessions.disconnect(node_id); } + StateAction::NewBlock { peer_id, block: msg } => { + let msg = PeerMessage::NewBlock(msg); + self.sessions.send_message(&peer_id, msg); + } } None } @@ -191,11 +211,11 @@ where /// network. pub enum SwarmEvent { /// Events related to the actual network protocol. - CapabilityMessage { + ValidMessage { /// The peer that sent the message node_id: PeerId, /// Message received from the peer - message: CapabilityMessage, + message: PeerMessage, }, /// Received a message that does not match the announced capabilities of the peer. InvalidCapabilityMessage { @@ -230,6 +250,8 @@ pub enum SwarmEvent { SessionEstablished { node_id: PeerId, remote_addr: SocketAddr, + capabilities: Arc, + messages: PeerRequestSender, }, SessionClosed { node_id: PeerId, diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index d5f12cd28..9a0a07630 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -1,12 +1,25 @@ //! Transaction management for the p2p network. -use crate::{manager::NetworkEvent, NetworkHandle}; -use reth_primitives::{Transaction, H256}; +use crate::{cache::LruCache, manager::NetworkEvent, message::PeerRequestSender, NetworkHandle}; +use futures::stream::FuturesUnordered; +use reth_primitives::{PeerId, Transaction, H256}; use reth_transaction_pool::TransactionPool; -use std::collections::HashMap; +use std::{ + collections::{hash_map::Entry, HashMap}, + future::Future, + num::NonZeroUsize, + pin::Pin, + sync::Arc, +}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; +/// Cache limit of transactions to keep track of for a single peer. +const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024; + +/// The future for inserting a function into the pool +pub type PoolImportFuture = Pin + Send>>; + /// Api to interact with [`TransactionsManager`] task. pub struct TransactionsHandle { /// Command channel to the [`TransactionsManager`] @@ -39,10 +52,15 @@ pub struct TransactionsManager { /// /// From which we get all new incoming transaction related messages. network_events: UnboundedReceiverStream, - /// All currently pending transactions - pending_transactions: (), - /// All the peers that have sent the same transactions. - peers: HashMap>, + /// All currently pending transactions grouped by peers. + /// + /// This way we can track incoming transactions and prevent multiple pool imports for the same + /// transaction + transactions_by_peers: HashMap>, + /// Transactions that are currently imported into the `Pool` + pool_imports: FuturesUnordered, + /// All the connected peers. + peers: HashMap, /// Send half for the command channel. command_tx: mpsc::UnboundedSender, /// Incoming commands from [`TransactionsHandle`]. @@ -64,7 +82,8 @@ where pool, network, network_events: UnboundedReceiverStream::new(network_events), - pending_transactions: (), + transactions_by_peers: Default::default(), + pool_imports: Default::default(), peers: Default::default(), command_tx, command_rx: UnboundedReceiverStream::new(command_rx), @@ -76,10 +95,64 @@ where TransactionsHandle { manager_tx: self.command_tx.clone() } } + /// Handles a received event + async fn on_event(&mut self, event: NetworkEvent) { + match event { + NetworkEvent::SessionClosed { peer_id } => { + // remove the peer + self.peers.remove(&peer_id); + } + NetworkEvent::SessionEstablished { peer_id, messages, .. } => { + // insert a new peer + self.peers.insert( + peer_id, + Peer { + transactions: LruCache::new( + NonZeroUsize::new(PEER_TRANSACTION_CACHE_LIMIT).unwrap(), + ), + request_tx: messages, + }, + ); + + // TODO send `NewPooledTransactionHashes + } + NetworkEvent::IncomingTransactions { peer_id, msg } => { + let transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone()); + + if let Some(peer) = self.peers.get_mut(&peer_id) { + for tx in transactions.0 { + // track that the peer knows this transaction + peer.transactions.insert(tx.hash); + + match self.transactions_by_peers.entry(tx.hash) { + Entry::Occupied(mut entry) => { + // transaction was already inserted + entry.get_mut().push(peer_id); + } + Entry::Vacant(_) => { + // TODO import into the pool + } + } + } + } + } + NetworkEvent::IncomingPooledTransactionHashes { .. } => {} + NetworkEvent::GetPooledTransactions { .. } => {} + } + } + /// Executes an endless future pub async fn run(self) {} } +/// Tracks a single peer +struct Peer { + /// Keeps track of transactions that we know the peer has seen. + transactions: LruCache, + /// A communication channel directly to the session task. + request_tx: PeerRequestSender, +} + /// Commands to send to the [`TransactionManager`] enum TransactionsCommand { Propagate(H256),