diff --git a/Cargo.lock b/Cargo.lock index ba818364f..a3b2bbacf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3238,6 +3238,35 @@ dependencies = [ "libc", ] +[[package]] +name = "reth-network" +version = "0.1.0" +dependencies = [ + "aquamarine", + "async-trait", + "bytes", + "either", + "fnv", + "futures", + "parking_lot 0.12.1", + "pin-project", + "rand 0.8.5", + "reth-discv4", + "reth-ecies", + "reth-eth-wire", + "reth-interfaces", + "reth-primitives", + "reth-rlp", + "reth-rlp-derive", + "reth-transaction-pool", + "secp256k1", + "smol_str", + "thiserror", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "reth-p2p" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 53a94b144..1036ccc0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "crates/net/ecies", "crates/net/eth-wire", "crates/net/discv4", + "crates/net/network", "crates/net/rpc", "crates/net/rpc-api", "crates/net/rpc-types", diff --git a/crates/net/discv4/src/config.rs b/crates/net/discv4/src/config.rs index f594d44d5..bcafafc15 100644 --- a/crates/net/discv4/src/config.rs +++ b/crates/net/discv4/src/config.rs @@ -33,6 +33,12 @@ pub struct Discv4Config { pub ban_duration: Option, /// Nodes to boot from. pub bootstrap_nodes: HashSet, + /// Whether to randomly discover new peers. + /// + /// If true, the node will automatically randomly walk the DHT in order to find new peers. + pub enable_dht_random_walk: bool, + /// Whether to automatically lookup peers. + pub enable_lookup: bool, } impl Discv4Config { @@ -55,10 +61,13 @@ impl Default for Discv4Config { permit_ban_list: PermitBanList::default(), ban_duration: Some(Duration::from_secs(3600)), // 1 hour bootstrap_nodes: Default::default(), + enable_dht_random_walk: true, + enable_lookup: true, } } } +/// Builder type for [`Discv4Config`] #[derive(Debug, Default)] pub struct Discv4ConfigBuilder { config: Discv4Config, @@ -89,6 +98,18 @@ impl Discv4ConfigBuilder { self } + /// Whether to discover random nodes in the DHT. + pub fn enable_dht_random_walk(&mut self, enable_dht_random_walk: bool) -> &mut Self { + self.config.enable_dht_random_walk = enable_dht_random_walk; + self + } + + /// Whether to automatically lookup + pub fn enable_lookup(&mut self, enable_lookup: bool) -> &mut Self { + self.config.enable_lookup = enable_lookup; + self + } + /// A set of lists that permit or ban IP's or NodeIds from the server. See /// `crate::PermitBanList`. pub fn permit_ban_list(&mut self, list: PermitBanList) -> &mut Self { @@ -122,7 +143,26 @@ impl Discv4ConfigBuilder { self } - pub fn build(&mut self) -> Discv4Config { + /// Returns the configured [`Discv4Config`] + pub fn build(&self) -> Discv4Config { self.config.clone() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_builder() { + let mut builder = Discv4Config::builder(); + let _ = builder + .enable_lookup(true) + .enable_dht_random_walk(true) + .add_boot_nodes(HashSet::new()) + .ban_duration(None) + .lookup_interval(Duration::from_secs(3)) + .enable_lookup(true) + .build(); + } +} diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index a0638bcf3..2d9f89969 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -57,7 +57,7 @@ pub mod error; mod proto; mod config; -pub use config::Discv4Config; +pub use config::{Discv4Config, Discv4ConfigBuilder}; mod node; pub use node::NodeRecord; @@ -218,11 +218,17 @@ impl Discv4 { async fn lookup_node(&self, node_id: Option) -> Result, Discv4Error> { let (tx, rx) = oneshot::channel(); - let cmd = Discv4Command::Lookup { node_id, tx }; + let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) }; self.to_service.send(cmd).await?; Ok(rx.await?) } + /// Triggers a new self lookup without expecting a response + pub fn send_lookup_self(&self) { + let cmd = Discv4Command::Lookup { node_id: None, tx: None }; + let _ = self.to_service.try_send(cmd); + } + /// Returns the receiver half of new listener channel that streams [`TableUpdate`]s. pub async fn update_stream(&self) -> Result, Discv4Error> { let (tx, rx) = oneshot::channel(); @@ -326,6 +332,12 @@ impl Discv4Service { let evict_expired_requests_interval = tokio::time::interval(config.find_node_timeout); + let lookup_rotator = if config.enable_dht_random_walk { + LookupTargetRotator::default() + } else { + LookupTargetRotator::local_only() + }; + Discv4Service { local_address, local_enr, @@ -345,7 +357,7 @@ impl Discv4Service { ping_interval, evict_expired_requests_interval, config, - lookup_rotator: Default::default(), + lookup_rotator, } } @@ -846,7 +858,7 @@ impl Discv4Service { /// if it has sent a valid Pong response with matching ping hash within the last 12 hours. pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { // trigger self lookup - if self.lookup_interval.poll_tick(cx).is_ready() { + if self.config.enable_lookup && self.lookup_interval.poll_tick(cx).is_ready() { let target = self.lookup_rotator.next(&self.local_enr.id); self.lookup_with(target, None); } @@ -856,7 +868,7 @@ impl Discv4Service { self.evict_expired_requests(Instant::now()) } - // reping some peers + // re-ping some peers if self.ping_interval.poll_tick(cx).is_ready() { self.reping_oldest(); } @@ -869,7 +881,7 @@ impl Discv4Service { match cmd { Discv4Command::Lookup { node_id, tx } => { let node_id = node_id.unwrap_or(self.local_enr.id); - self.lookup_with(node_id, Some(tx)); + self.lookup_with(node_id, tx); } Discv4Command::Updates(tx) => { let rx = self.update_stream(); @@ -998,7 +1010,7 @@ pub(crate) async fn receive_loop(udp: Arc, tx: IngressSender, local_i /// The commands sent from the frontend to the service enum Discv4Command { - Lookup { node_id: Option, tx: NodeRecordSender }, + Lookup { node_id: Option, tx: Option }, Updates(OneshotSender>), } @@ -1033,6 +1045,15 @@ struct LookupTargetRotator { counter: usize, } +// === impl LookupTargetRotator === + +impl LookupTargetRotator { + /// Returns a rotator that always returns the local target. + fn local_only() -> Self { + Self { interval: 1, counter: 0 } + } +} + impl Default for LookupTargetRotator { fn default() -> Self { Self { @@ -1253,6 +1274,25 @@ mod tests { }; use tracing_test::traced_test; + #[test] + fn test_local_rotator() { + let id = NodeId::random(); + let mut rotator = LookupTargetRotator::local_only(); + assert_eq!(rotator.next(&id), id); + assert_eq!(rotator.next(&id), id); + } + + #[test] + fn test_rotator() { + let id = NodeId::random(); + let mut rotator = LookupTargetRotator::default(); + assert_eq!(rotator.next(&id), id); + assert_ne!(rotator.next(&id), id); + assert_ne!(rotator.next(&id), id); + assert_ne!(rotator.next(&id), id); + assert_eq!(rotator.next(&id), id); + } + #[tokio::test] #[traced_test] async fn test_pending_ping() { @@ -1294,4 +1334,17 @@ mod tests { println!("total peers {}", table.len()); } } + + #[tokio::test(flavor = "multi_thread")] + #[traced_test] + async fn test_service_commands() { + let config = Discv4Config::builder().build(); + let (discv4, mut service) = create_discv4_with_config(config).await; + + service.lookup_self(); + + let _handle = service.spawn(); + discv4.send_lookup_self(); + let _ = discv4.lookup_self().await; + } } diff --git a/crates/net/discv4/src/node.rs b/crates/net/discv4/src/node.rs index 3a7f4016b..fa9dbab3d 100644 --- a/crates/net/discv4/src/node.rs +++ b/crates/net/discv4/src/node.rs @@ -4,6 +4,7 @@ use generic_array::GenericArray; use reth_primitives::keccak256; use reth_rlp::{Decodable, DecodeError, Encodable}; use reth_rlp_derive::RlpEncodable; +use secp256k1::{SecretKey, SECP256K1}; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, str::FromStr, @@ -48,6 +49,13 @@ pub struct NodeRecord { } impl NodeRecord { + /// Derive the [`NodeRecord`] from the secret key and addr + pub fn from_secret_key(addr: SocketAddr, sk: &SecretKey) -> Self { + let pk = secp256k1::PublicKey::from_secret_key(SECP256K1, sk); + let id = NodeId::from_slice(&pk.serialize_uncompressed()[1..]); + Self::new(addr, id) + } + /// Creates a new record #[allow(unused)] pub(crate) fn new(addr: SocketAddr, id: NodeId) -> Self { diff --git a/crates/net/discv4/src/proto.rs b/crates/net/discv4/src/proto.rs index 81036a628..472346f52 100644 --- a/crates/net/discv4/src/proto.rs +++ b/crates/net/discv4/src/proto.rs @@ -478,7 +478,7 @@ mod tests { let (secret_key, _) = SECP256K1.generate_keypair(&mut rng); let (encoded, _) = msg.encode(&secret_key); - assert!(encoded.len() <= MAX_PACKET_SIZE, "{} {:?}", encoded.len(), msg); + assert!(encoded.len() <= MAX_PACKET_SIZE, "{} {msg:?}", encoded.len()); let mut neighbours = Neighbours { nodes: std::iter::repeat_with(|| rng_ipv6_record(&mut rng)) @@ -489,7 +489,7 @@ mod tests { neighbours.nodes.push(rng_ipv4_record(&mut rng)); let msg = Message::Neighbours(neighbours); let (encoded, _) = msg.encode(&secret_key); - assert!(encoded.len() <= MAX_PACKET_SIZE, "{} {:?}", encoded.len(), msg); + assert!(encoded.len() <= MAX_PACKET_SIZE, "{} {msg:?}", encoded.len()); } } diff --git a/crates/net/eth-wire/src/ethstream.rs b/crates/net/eth-wire/src/ethstream.rs index 743f2ecf3..29b4e9497 100644 --- a/crates/net/eth-wire/src/ethstream.rs +++ b/crates/net/eth-wire/src/ethstream.rs @@ -29,6 +29,11 @@ impl UnauthedEthStream { pub fn new(inner: S) -> Self { Self { inner } } + + /// Consumes the type and returns the wrapped stream + pub fn into_inner(self) -> S { + self.inner + } } impl UnauthedEthStream diff --git a/crates/net/eth-wire/src/lib.rs b/crates/net/eth-wire/src/lib.rs index 9c5ec5f55..583a0913b 100644 --- a/crates/net/eth-wire/src/lib.rs +++ b/crates/net/eth-wire/src/lib.rs @@ -19,4 +19,4 @@ mod pinger; pub mod types; pub use types::*; -pub use ethstream::EthStream; +pub use ethstream::{EthStream, UnauthedEthStream}; diff --git a/crates/net/eth-wire/src/types/blocks.rs b/crates/net/eth-wire/src/types/blocks.rs index ec535a0e9..935b7f559 100644 --- a/crates/net/eth-wire/src/types/blocks.rs +++ b/crates/net/eth-wire/src/types/blocks.rs @@ -63,7 +63,7 @@ impl Decodable for BlockHashOrNumber { /// traversing towards the latest block. /// /// If the [`skip`](#structfield.skip) field is non-zero, the peer must skip that amount of headers -/// in the the direction specified by [`reverse`](#structfield.reverse). +/// in the direction specified by [`reverse`](#structfield.reverse). #[derive(Copy, Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)] pub struct GetBlockHeaders { /// The block number or hash that the peer should start returning headers from. diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml new file mode 100644 index 000000000..35f07d60a --- /dev/null +++ b/crates/net/network/Cargo.toml @@ -0,0 +1,47 @@ +[package] +name = "reth-network" +version = "0.1.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/foundry-rs/reth" +readme = "README.md" +description = """ +Ethereum network support +""" + +[dependencies] +# reth +reth-interfaces = { path = "../../interfaces" } +reth-primitives = { path = "../../primitives" } +reth-discv4 = { path = "../discv4" } +reth-eth-wire = { path = "../eth-wire" } +reth-ecies = { path = "../ecies" } +reth-rlp = { path = "../../common/rlp", features = ["smol_str"] } +reth-rlp-derive = { path = "../../common/rlp-derive" } +reth-transaction-pool = { path = "../../transaction-pool" } + +# async/futures +futures = "0.3" +pin-project = "1.0" +tokio = { version = "1", features = ["io-util", "net", "macros", "rt-multi-thread", "time"] } +tokio-stream = "0.1" + +# misc +aquamarine = "0.1" # docs +tracing = "0.1" +fnv = "1.0" +thiserror = "1.0" +parking_lot = "0.12" +async-trait = "0.1" +bytes = "1.2" +smol_str = { version = "0.1", default-features = false } +either = "1.8" + +secp256k1 = { version = "0.24", features = [ + "global-context", + "rand-std", + "recovery", +] } + +[dev-dependencies] +rand = "0.8" diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs new file mode 100644 index 000000000..c5032b81f --- /dev/null +++ b/crates/net/network/src/config.rs @@ -0,0 +1,129 @@ +use crate::{peers::PeersConfig, session::SessionsConfig}; +use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_PORT}; +use reth_eth_wire::forkid::ForkId; +use reth_primitives::Chain; +use secp256k1::SecretKey; +use std::{ + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + sync::Arc, +}; + +/// All network related initialization settings. +pub struct NetworkConfig { + /// The client type that can interact with the chain. + pub client: Arc, + /// The node's secret key, from which the node's identity is derived. + pub secret_key: SecretKey, + /// How to set up discovery. + pub discovery_v4_config: Discv4Config, + /// Address to use for discovery + pub discovery_addr: SocketAddr, + /// Address to listen for incoming connections + pub listener_addr: SocketAddr, + /// How to instantiate peer manager. + pub peers_config: PeersConfig, + /// How to configure the [`SessionManager`] + pub sessions_config: SessionsConfig, + /// A fork identifier as defined by EIP-2124. + /// Serves as the chain compatibility identifier. + pub fork_id: Option, + /// The id of the network + pub chain: Chain, +} + +// === impl NetworkConfig === + +impl NetworkConfig { + /// Create a new instance with all mandatory fields set, rest is field with defaults. + pub fn new(client: Arc, secret_key: SecretKey) -> Self { + Self::builder(client, secret_key).build() + } + + /// Convenience method for creating the corresponding builder type + pub fn builder(client: Arc, secret_key: SecretKey) -> NetworkConfigBuilder { + NetworkConfigBuilder::new(client, secret_key) + } + + /// Sets the config to use for the discovery v4 protocol. + pub fn set_discovery_v4(mut self, discovery_config: Discv4Config) -> Self { + self.discovery_v4_config = discovery_config; + self + } + + /// Sets the address for the incoming connection listener. + pub fn set_listener_addr(mut self, listener_addr: SocketAddr) -> Self { + self.listener_addr = listener_addr; + self + } +} + +/// Builder for [`NetworkConfig`](struct.NetworkConfig.html). +pub struct NetworkConfigBuilder { + /// The client type that can interact with the chain. + client: Arc, + /// The node's secret key, from which the node's identity is derived. + secret_key: SecretKey, + /// How to set up discovery. + discovery_v4_builder: Discv4ConfigBuilder, + /// Address to use for discovery + discovery_addr: Option, + /// Listener for incoming connections + listener_addr: Option, + /// How to instantiate peer manager. + peers_config: Option, + /// How to configure the sessions manager + sessions_config: Option, + + fork_id: Option, + + chain: Chain, +} + +// === impl NetworkConfigBuilder === + +#[allow(missing_docs)] +impl NetworkConfigBuilder { + pub fn new(client: Arc, secret_key: SecretKey) -> Self { + Self { + client, + secret_key, + discovery_v4_builder: Default::default(), + discovery_addr: None, + listener_addr: None, + peers_config: None, + sessions_config: None, + fork_id: None, + chain: Chain::Named(reth_primitives::rpc::Chain::Mainnet), + } + } + + /// Consumes the type and creates the actual [`NetworkConfig`] + pub fn build(self) -> NetworkConfig { + let Self { + client, + secret_key, + discovery_v4_builder, + discovery_addr, + listener_addr, + peers_config, + sessions_config, + fork_id, + chain, + } = self; + NetworkConfig { + client, + secret_key, + discovery_v4_config: discovery_v4_builder.build(), + discovery_addr: discovery_addr.unwrap_or_else(|| { + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT)) + }), + listener_addr: listener_addr.unwrap_or_else(|| { + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT)) + }), + peers_config: peers_config.unwrap_or_default(), + sessions_config: sessions_config.unwrap_or_default(), + fork_id, + chain, + } + } +} diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs new file mode 100644 index 000000000..65001938d --- /dev/null +++ b/crates/net/network/src/discovery.rs @@ -0,0 +1,152 @@ +//! Discovery support for the network. + +use crate::{error::NetworkError, NodeId}; +use futures::StreamExt; +use reth_discv4::{Discv4, Discv4Config, NodeRecord, TableUpdate}; +use secp256k1::SecretKey; +use std::{ + collections::{hash_map::Entry, HashMap, VecDeque}, + net::SocketAddr, + task::{Context, Poll}, +}; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::ReceiverStream; + +/// An abstraction over the configured discovery protocol. +/// +/// Listens for new discovered nodes and emits events for discovered nodes and their address. +pub struct Discovery { + /// All nodes discovered via discovery protocol. + /// + /// These nodes can be ephemeral and are updated via the discovery protocol. + discovered_nodes: HashMap, + /// Local ENR of the discovery service. + local_enr: NodeRecord, + /// Handler to interact with the Discovery v4 service + discv4: Discv4, + /// All updates from the discv4 service. + discv4_updates: ReceiverStream, + /// The initial config for the discv4 service + dsicv4_config: Discv4Config, + /// Buffered events until polled. + queued_events: VecDeque, + /// The handle to the spawned discv4 service + _discv4_service: JoinHandle<()>, +} + +impl Discovery { + /// Spawns the discovery service. + /// + /// This will spawn the [`reth_discv4::Discv4Service`] onto a new task and establish a listener + /// channel to receive all discovered nodes. + pub async fn new( + discovery_addr: SocketAddr, + sk: SecretKey, + dsicv4_config: Discv4Config, + ) -> Result { + let local_enr = NodeRecord::from_secret_key(discovery_addr, &sk); + let (discv4, mut discv4_service) = + Discv4::bind(discovery_addr, local_enr, sk, dsicv4_config.clone()) + .await + .map_err(NetworkError::Discovery)?; + let discv4_updates = discv4_service.update_stream(); + + // spawn the service + let _discv4_service = discv4_service.spawn(); + + Ok(Self { + local_enr, + discv4, + discv4_updates, + dsicv4_config, + _discv4_service, + discovered_nodes: Default::default(), + queued_events: Default::default(), + }) + } + + /// Returns the id with which the local identifies itself in the network + pub(crate) fn local_id(&self) -> NodeId { + self.local_enr.id + } + + /// Manually adds an address to the set. + pub(crate) fn add_known_address(&mut self, node_id: NodeId, addr: SocketAddr) { + self.on_discv4_update(TableUpdate::Added(NodeRecord { + address: addr.ip(), + tcp_port: addr.port(), + udp_port: addr.port(), + id: node_id, + })) + } + + /// Returns all nodes we know exist in the network. + pub fn known_nodes(&mut self) -> &HashMap { + &self.discovered_nodes + } + + fn on_discv4_update(&mut self, update: TableUpdate) { + match update { + TableUpdate::Added(node) => { + let id = node.id; + let addr = node.tcp_addr(); + match self.discovered_nodes.entry(id) { + Entry::Occupied(_entry) => {} + Entry::Vacant(entry) => { + entry.insert(addr); + self.queued_events.push_back(DiscoveryEvent::Discovered(id, addr)) + } + } + } + TableUpdate::Removed(node) => { + self.discovered_nodes.remove(&node); + } + TableUpdate::Batch(updates) => { + for update in updates { + self.on_discv4_update(update); + } + } + } + } + + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + loop { + // Drain all buffered events first + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event) + } + + while let Poll::Ready(Some(update)) = self.discv4_updates.poll_next_unpin(cx) { + self.on_discv4_update(update) + } + + if self.queued_events.is_empty() { + return Poll::Pending + } + } + // drain the update stream + } +} + +/// Events produced by the [`Discovery`] manager. +pub enum DiscoveryEvent { + /// A new node was discovered + Discovered(NodeId, SocketAddr), +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::thread_rng; + use secp256k1::SECP256K1; + use std::net::{Ipv4Addr, SocketAddrV4}; + + #[tokio::test(flavor = "multi_thread")] + async fn test_discovery_setup() { + let mut rng = thread_rng(); + let (secret_key, _) = SECP256K1.generate_keypair(&mut rng); + let discovery_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); + let _discovery = + Discovery::new(discovery_addr, secret_key, Default::default()).await.unwrap(); + } +} diff --git a/crates/net/network/src/error.rs b/crates/net/network/src/error.rs new file mode 100644 index 000000000..45871abf4 --- /dev/null +++ b/crates/net/network/src/error.rs @@ -0,0 +1,12 @@ +//! Possible errors when interacting with the network. + +/// All error variants for the network +#[derive(Debug, thiserror::Error)] +pub enum NetworkError { + /// General IO error. + #[error(transparent)] + Io(#[from] std::io::Error), + /// IO error when creating the discovery service + #[error("Failed to launch discovery service: {0}")] + Discovery(std::io::Error), +} diff --git a/crates/net/network/src/fetch.rs b/crates/net/network/src/fetch.rs new file mode 100644 index 000000000..3113c9703 --- /dev/null +++ b/crates/net/network/src/fetch.rs @@ -0,0 +1,186 @@ +//! Fetch data from the network. + +use crate::{message::RequestResult, NodeId}; +use futures::StreamExt; +use reth_eth_wire::{BlockBody, EthMessage}; +use reth_interfaces::p2p::headers::client::HeadersRequest; +use reth_primitives::{Header, H256, U256}; +use std::{ + collections::{HashMap, VecDeque}, + task::{Context, Poll}, + time::Instant, +}; +use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// Manages data fetching operations. +/// +/// This type is hooked into the staged sync pipeline and delegates download request to available +/// peers and sends the response once ready. +pub struct StateFetcher { + /// Currently active [`GetBlockHeaders`] requests + inflight_headers_requests: HashMap>>>, + /// The list of available peers for requests. + peers: HashMap, + /// Requests queued for processing + queued_requests: VecDeque, + /// Receiver for new incoming download requests + download_requests_rx: UnboundedReceiverStream, + /// Sender for download requests, used to detach a [`HeadersDownloader`] + download_requests_tx: UnboundedSender, +} + +// === impl StateSyncer === + +impl StateFetcher { + /// Invoked when connected to a new peer. + pub(crate) fn new_connected_peer( + &mut self, + _node_id: NodeId, + _best_hash: H256, + _best_number: U256, + ) { + } + + /// Returns the next action to return + fn poll_action(&mut self) -> Option { + // TODO find matching peers + + // if let Some(request) = self.queued_requests.pop_front() { + // if let Some(action) = self.on_download_request(request) { + // return Poll::Ready(action) + // } + // } + None + } + + fn on_download_request(&mut self, request: DownloadRequest) -> Option { + match request { + DownloadRequest::GetBlockHeaders { request: _, response: _ } => {} + DownloadRequest::GetBlockBodies { .. } => {} + } + None + } + + /// Advance the state the syncer + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + // drain buffered actions first + if let Some(action) = self.poll_action() { + return Poll::Ready(action) + } + + loop { + // poll incoming requests + match self.download_requests_rx.poll_next_unpin(cx) { + Poll::Ready(Some(request)) => { + if let Some(action) = self.on_download_request(request) { + return Poll::Ready(action) + } + } + Poll::Ready(None) => { + unreachable!("channel can't close") + } + Poll::Pending => break, + } + } + + if self.queued_requests.is_empty() { + return Poll::Pending + } + + Poll::Pending + } + + /// Called on a `GetBlockHeaders` response from a peer + pub(crate) fn on_block_headers_response( + &mut self, + _from: NodeId, + _msg: RequestResult>, + ) { + } + + /// Returns a new [`HeadersDownloader`] that can send requests to this type + pub(crate) fn headers_downloader(&self) -> HeadersDownloader { + HeadersDownloader { request_tx: self.download_requests_tx.clone() } + } +} + +impl Default for StateFetcher { + fn default() -> Self { + let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel(); + Self { + inflight_headers_requests: Default::default(), + peers: Default::default(), + queued_requests: Default::default(), + download_requests_rx: UnboundedReceiverStream::new(download_requests_rx), + download_requests_tx, + } + } +} + +/// Front-end API for downloading headers. +#[derive(Debug)] +pub struct HeadersDownloader { + /// Sender half of the request channel. + request_tx: UnboundedSender, +} + +// === impl HeadersDownloader === + +impl HeadersDownloader { + /// Sends a `GetBlockHeaders` request to an available peer. + pub async fn get_block_headers(&self, request: HeadersRequest) -> RequestResult> { + let (response, rx) = oneshot::channel(); + self.request_tx.send(DownloadRequest::GetBlockHeaders { request, response })?; + rx.await? + } +} + +/// Represents a connected peer +struct Peer { + /// Identifier for requests. + request_id: u64, + /// The state this peer currently resides in. + state: PeerState, + /// Best known hash that the peer has + best_hash: H256, + /// Best known number the peer has. + best_number: U256, +} + +/// Tracks the state of an individual peer +enum PeerState { + /// Peer is currently not handling requests and is available. + Idle, + /// Peer is handling a `GetBlockHeaders` request. + GetBlockHeaders, +} + +/// A request that waits for a response from the network so it can send it back through the response +/// channel. +struct Request { + request: Req, + response: oneshot::Sender, + started: Instant, +} + +/// Requests that can be sent to the Syncer from a [`HeadersDownloader`] +enum DownloadRequest { + /// Download the requested headers and send response through channel + GetBlockHeaders { + request: HeadersRequest, + response: oneshot::Sender>>, + }, + /// Download the requested headers and send response through channel + GetBlockBodies { request: Vec, response: oneshot::Sender>> }, +} + +/// An action the syncer can emit. +pub(crate) enum FetchAction { + /// Dispatch an eth request to the given peer. + EthRequest { + node_id: NodeId, + /// The request to send + request: EthMessage, + }, +} diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs new file mode 100644 index 000000000..05ad4c2d7 --- /dev/null +++ b/crates/net/network/src/lib.rs @@ -0,0 +1,38 @@ +#![warn(missing_docs)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] +// TODO remove later +#![allow(dead_code)] + +//! reth P2P networking. +//! +//! Ethereum's networking protocol is specified in [devp2p](https://github.com/ethereum/devp2p). +//! +//! In order for a node to join the ethereum p2p network it needs to know what nodes are already +//! port of that network. This includes public identities (public key) and addresses (where to reach +//! them). + +mod config; +mod discovery; +pub mod error; +mod fetch; +mod listener; +mod manager; +mod message; +mod network; +mod peers; +mod session; +mod state; +mod swarm; +mod transactions; + +/// Identifier for a unique node +pub type NodeId = reth_discv4::NodeId; + +pub use config::NetworkConfig; +pub use manager::NetworkManager; +pub use network::NetworkHandle; +pub use peers::PeersConfig; diff --git a/crates/net/network/src/listener.rs b/crates/net/network/src/listener.rs new file mode 100644 index 000000000..de152cf85 --- /dev/null +++ b/crates/net/network/src/listener.rs @@ -0,0 +1,128 @@ +//! Contains connection-oriented interfaces. + +use futures::{ready, Stream}; + +use std::{ + io, + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::net::{TcpListener, TcpStream}; + +/// A tcp connection listener. +/// +/// Listens for incoming connections. +#[must_use = "Transport does nothing unless polled."] +#[pin_project::pin_project] +#[derive(Debug)] +pub struct ConnectionListener { + /// Local address of the listener stream. + local_address: SocketAddr, + /// The active tcp listener for incoming connections. + #[pin] + incoming: TcpListenerStream, +} + +impl ConnectionListener { + /// Creates a new [`TcpListener`] that listens for incoming connections. + pub async fn bind(addr: SocketAddr) -> io::Result { + let listener = TcpListener::bind(addr).await?; + let local_addr = listener.local_addr()?; + Ok(Self::new(listener, local_addr)) + } + + /// Creates a new connection listener stream. + pub(crate) fn new(listener: TcpListener, local_address: SocketAddr) -> Self { + Self { local_address, incoming: TcpListenerStream { inner: listener } } + } + + /// Polls the type to make progress. + pub fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match ready!(this.incoming.poll_next(cx)) { + Some(Ok((stream, remote_addr))) => { + Poll::Ready(ListenerEvent::Incoming { stream, remote_addr }) + } + Some(Err(err)) => Poll::Ready(ListenerEvent::Error(err)), + None => { + Poll::Ready(ListenerEvent::ListenerClosed { local_address: *this.local_address }) + } + } + } + + /// Returns the socket address this listener listens on. + pub fn local_address(&self) -> SocketAddr { + self.local_address + } +} + +/// Event type produced by the [`Transport`]. +pub enum ListenerEvent { + /// Received a new incoming. + Incoming { + /// Accepted connection + stream: TcpStream, + /// Address of the remote peer. + remote_addr: SocketAddr, + }, + /// Returned when the underlying connection listener has been closed. + /// + /// This is the case if the [`TcpListenerStream`] should ever return `None` + ListenerClosed { + /// Address of the closed listener. + local_address: SocketAddr, + }, + /// Encountered an error when accepting a connection. + /// + /// This is non-fatal error as the listener continues to listen for new connections to accept. + Error(io::Error), +} + +/// A stream of incoming [`TcpStream`]s. +#[derive(Debug)] +struct TcpListenerStream { + /// listener for incoming connections. + inner: TcpListener, +} + +impl Stream for TcpListenerStream { + type Item = io::Result<(TcpStream, SocketAddr)>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok(conn)) => Poll::Ready(Some(Ok(conn))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::pin_mut; + use std::net::{Ipv4Addr, SocketAddrV4}; + use tokio::macros::support::poll_fn; + + #[tokio::test(flavor = "multi_thread")] + async fn test_incoming_listener() { + let listener = + ConnectionListener::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) + .await + .unwrap(); + let local_addr = listener.local_address(); + + tokio::task::spawn(async move { + pin_mut!(listener); + match poll_fn(|cx| listener.as_mut().poll(cx)).await { + ListenerEvent::Incoming { .. } => {} + _ => { + panic!("unexpected event") + } + } + }); + + let _ = TcpStream::connect(local_addr).await.unwrap(); + } +} diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs new file mode 100644 index 000000000..22e618842 --- /dev/null +++ b/crates/net/network/src/manager.rs @@ -0,0 +1,324 @@ +//! High level network management. +//! +//! The [`Network`] contains the state of the network as a whole. It controls how connections are +//! handled and keeps track of connections to peers. +//! +//! ## Capabilities +//! +//! The network manages peers depending on their announced capabilities via their RLPx sessions. Most importantly the [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md)(`eth`). +//! +//! ## Overview +//! +//! The [`NetworkManager`] is responsible for advancing the state of the `network`. The `network` is +//! made up of peer-to-peer connections between nodes that are available on the same network. +//! Responsible for peer discovery is ethereum's discovery protocol (discv4, discv5). If the address +//! (IP+port) of our node is published via discovery, remote peers can initiate inbound connections +//! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the RLPx session. + +use crate::{ + config::NetworkConfig, + discovery::Discovery, + error::NetworkError, + listener::ConnectionListener, + message::{Capabilities, CapabilityMessage}, + network::{NetworkHandle, NetworkHandleMessage}, + peers::PeersManager, + session::SessionManager, + state::NetworkState, + swarm::{Swarm, SwarmEvent}, + NodeId, +}; +use futures::{Future, StreamExt}; +use parking_lot::Mutex; +use reth_eth_wire::EthMessage; +use reth_interfaces::provider::BlockProvider; +use std::{ + net::SocketAddr, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + task::{Context, Poll}, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::{error, trace}; + +/// Manages the _entire_ state of the network. +/// +/// This is an endless [`Future`] that consistently drives the state of the entire network forward. +/// +/// The [`NetworkManager`] is the container type for all parts involved with advancing the network. +#[cfg_attr(doc, aquamarine::aquamarine)] +/// ```mermaid +/// graph TB +/// handle(NetworkHandle) +/// events(NetworkEvents) +/// subgraph NetworkManager +/// direction LR +/// subgraph Swarm +/// direction TB +/// B1[(Peer Sessions)] +/// B2[(Connection Lister)] +/// B3[(State)] +/// end +/// end +/// handle <--> |request/response channel| NetworkManager +/// NetworkManager --> |Network events| events +/// ``` +#[must_use = "The NetworkManager does nothing unless polled"] +pub struct NetworkManager { + /// The type that manages the actual network part, which includes connections. + swarm: Swarm, + /// Underlying network handle that can be shared. + handle: NetworkHandle, + /// Receiver half of the command channel set up between this type and the [`NetworkHandle`] + from_handle_rx: UnboundedReceiverStream, + /// Handles block imports. + block_import_sink: (), + /// The address of this node that listens for incoming connections. + listener_address: Arc>, + /// All listeners for [`Network`] events. + event_listeners: NetworkEventListeners, + /// Tracks the number of active session (connected peers). + /// + /// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`] + /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. + num_active_peers: Arc, + /// Local copy of the `NodeId` of the local node. + local_node_id: NodeId, +} + +// === impl NetworkManager === + +impl NetworkManager +where + C: BlockProvider, +{ + /// Creates the manager of a new network. + /// + /// The [`NetworkManager`] is an endless future that needs to be polled in order to advance the + /// state of the entire network. + pub async fn new(config: NetworkConfig) -> Result { + let NetworkConfig { + client, + secret_key, + discovery_v4_config, + discovery_addr, + listener_addr, + peers_config, + sessions_config, + .. + } = config; + + let peers_manger = PeersManager::new(peers_config); + let peers_handle = peers_manger.handle(); + + let incoming = ConnectionListener::bind(listener_addr).await?; + let listener_address = Arc::new(Mutex::new(incoming.local_address())); + + let discovery = Discovery::new(discovery_addr, secret_key, discovery_v4_config).await?; + // need to retrieve the addr here since provided port could be `0` + let local_node_id = discovery.local_id(); + + // TODO this should also need sk for encrypted sessions + let sessions = SessionManager::new(secret_key, sessions_config); + let state = NetworkState::new(client, discovery, peers_manger); + + let swarm = Swarm::new(incoming, sessions, state); + + let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel(); + + let num_active_peers = Arc::new(AtomicUsize::new(0)); + let handle = NetworkHandle::new( + Arc::clone(&num_active_peers), + Arc::clone(&listener_address), + to_manager_tx, + local_node_id, + peers_handle, + ); + + Ok(Self { + swarm, + handle, + from_handle_rx: UnboundedReceiverStream::new(from_handle_rx), + block_import_sink: (), + listener_address, + event_listeners: Default::default(), + num_active_peers, + local_node_id, + }) + } + + /// Returns the [`NetworkHandle`] that can be cloned and shared. + /// + /// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`] + pub fn handle(&self) -> &NetworkHandle { + &self.handle + } + + /// Event hook for an unexpected message from the peer. + fn on_invalid_message( + &self, + _node_id: NodeId, + _capabilities: Arc, + _message: CapabilityMessage, + ) { + // TODO: disconnect? + } + + /// Handles a received [`CapabilityMessage`] from the peer. + fn on_capability_message(&mut self, _node_id: NodeId, msg: CapabilityMessage) { + match msg { + CapabilityMessage::Eth(eth) => { + match eth { + EthMessage::Status(_) => {} + EthMessage::NewBlockHashes(_) => { + // update peer's state, to track what blocks this peer has seen + } + EthMessage::NewBlock(_) => { + // emit new block and track that the peer knows this block + } + EthMessage::Transactions(_) => { + // need to emit this as event/send to tx handler + } + EthMessage::NewPooledTransactionHashes(_) => { + // need to emit this as event/send to tx handler + } + + // TODO: should remove the response types here, as they are handled separately + EthMessage::GetBlockHeaders(_) => {} + EthMessage::BlockHeaders(_) => {} + EthMessage::GetBlockBodies(_) => {} + EthMessage::BlockBodies(_) => {} + EthMessage::GetPooledTransactions(_) => {} + EthMessage::PooledTransactions(_) => {} + EthMessage::GetNodeData(_) => {} + EthMessage::NodeData(_) => {} + EthMessage::GetReceipts(_) => {} + EthMessage::Receipts(_) => {} + } + } + CapabilityMessage::Other(_) => { + // other subprotocols + } + } + } + + /// Handler for received messages from a handle + fn on_handle_message(&mut self, msg: NetworkHandleMessage) { + match msg { + NetworkHandleMessage::EventListener(tx) => { + self.event_listeners.listeners.push(tx); + } + NetworkHandleMessage::NewestBlock(_, _) => {} + _ => {} + } + } +} + +impl Future for NetworkManager +where + C: BlockProvider, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + // process incoming messages from a handle + loop { + match this.from_handle_rx.poll_next_unpin(cx) { + Poll::Pending => break, + Poll::Ready(None) => { + // This is only possible if the channel was deliberately closed since we always + // have an instance of `NetworkHandle` + error!("network message channel closed."); + return Poll::Ready(()) + } + Poll::Ready(Some(msg)) => this.on_handle_message(msg), + }; + } + + // advance the swarm + while let Poll::Ready(Some(event)) = this.swarm.poll_next_unpin(cx) { + // handle event + match event { + SwarmEvent::CapabilityMessage { node_id, message } => { + this.on_capability_message(node_id, message) + } + SwarmEvent::InvalidCapabilityMessage { node_id, capabilities, message } => { + this.on_invalid_message(node_id, capabilities, message) + } + SwarmEvent::TcpListenerClosed { remote_addr } => { + trace!(?remote_addr, target = "net", "TCP listener closed."); + } + SwarmEvent::TcpListenerError(err) => { + trace!(?err, target = "net", "TCP connection error."); + } + SwarmEvent::IncomingTcpConnection { remote_addr, .. } => { + trace!(?remote_addr, target = "net", "Incoming connection"); + } + SwarmEvent::OutgoingTcpConnection { remote_addr } => { + trace!(?remote_addr, target = "net", "Starting outbound connection."); + } + SwarmEvent::SessionEstablished { node_id, remote_addr } => { + let total_active = this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; + trace!( + ?remote_addr, + ?node_id, + ?total_active, + target = "net", + "Session established" + ); + } + SwarmEvent::SessionClosed { node_id, remote_addr } => { + let total_active = this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1; + trace!( + ?remote_addr, + ?node_id, + ?total_active, + target = "net", + "Session disconnected" + ); + } + SwarmEvent::IncomingPendingSessionClosed { .. } => {} + SwarmEvent::OutgoingPendingSessionClosed { .. } => {} + SwarmEvent::OutgoingConnectionError { .. } => {} + } + } + + todo!() + } +} + +/// Events emitted by the network that are of interest for subscribers. +#[derive(Debug, Clone)] +pub enum NetworkEvent { + EthMessage { node_id: NodeId, message: EthMessage }, +} + +/// Bundles all listeners for [`NetworkEvent`]s. +#[derive(Default)] +struct NetworkEventListeners { + /// All listeners for an event + listeners: Vec>, +} + +// === impl NetworkEventListeners === + +impl NetworkEventListeners { + /// Sends the event to all listeners. + /// + /// Remove channels that got closed. + fn send(&mut self, event: NetworkEvent) { + self.listeners.retain(|listener| { + let open = listener.send(event.clone()).is_ok(); + if !open { + trace!(target = "net", "event listener channel closed",); + } + open + }); + } +} diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs new file mode 100644 index 000000000..d98db200f --- /dev/null +++ b/crates/net/network/src/message.rs @@ -0,0 +1,143 @@ +//! Capability messaging +//! +//! An RLPx stream is multiplexed via the prepended message-id of a framed message. +//! Capabilities are exchanged via the RLPx `Hello` message as pairs of `(id, version)`, + +use bytes::{BufMut, Bytes}; +use reth_eth_wire::{BlockHeaders, EthMessage, GetBlockHeaders}; +use reth_rlp::{Decodable, DecodeError, Encodable}; +use reth_rlp_derive::{RlpDecodable, RlpEncodable}; +use smol_str::SmolStr; +use tokio::sync::{mpsc, oneshot}; + +/// Result alias for result of a request. +pub type RequestResult = Result; + +/// Error variants that can happen when sending requests to a session. +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum RequestError { + #[error("Closed channel.")] + ChannelClosed, + #[error("Not connected to the node.")] + NotConnected, + #[error("Capability Message is not supported by remote peer.")] + UnsupportedCapability, + #[error("Network error: {0}")] + Io(String), +} + +impl From> for RequestError { + fn from(_: mpsc::error::SendError) -> Self { + RequestError::ChannelClosed + } +} + +impl From for RequestError { + fn from(_: oneshot::error::RecvError) -> Self { + RequestError::ChannelClosed + } +} + +/// Represents all capabilities of a node. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct Capabilities { + /// All Capabilities and their versions + inner: Vec, + eth_66: bool, + eth_67: bool, +} + +impl Capabilities { + /// Whether this peer supports eth v66 protocol. + #[inline] + pub fn supports_eth_v66(&self) -> bool { + self.eth_66 + } + + /// Whether this peer supports eth v67 protocol. + #[inline] + pub fn supports_eth_v67(&self) -> bool { + self.eth_67 + } +} + +impl Encodable for Capabilities { + fn encode(&self, out: &mut dyn BufMut) { + self.inner.encode(out) + } +} + +impl Decodable for Capabilities { + fn decode(buf: &mut &[u8]) -> Result { + let inner = Vec::::decode(buf)?; + + Ok(Self { + eth_66: inner.iter().any(Capability::is_eth_v66), + eth_67: inner.iter().any(Capability::is_eth_v67), + inner, + }) + } +} + +/// Represents an announced Capability in the `Hello` message +#[derive(Debug, Clone, Eq, PartialEq, RlpDecodable, RlpEncodable)] +pub struct Capability { + /// Name of the Capability + pub name: SmolStr, + /// The version of the capability + pub version: u64, +} + +// === impl Capability === + +impl Capability { + /// Whether this is eth v66 protocol. + #[inline] + pub fn is_eth_v66(&self) -> bool { + self.name == "eth" && self.version == 66 + } + + /// Whether this is eth v67. + #[inline] + pub fn is_eth_v67(&self) -> bool { + self.name == "eth" && self.version == 67 + } +} + +/// A Capability message consisting of the message-id and the payload +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct RawCapabilityMessage { + /// Identifier of the message. + pub id: usize, + /// Actual payload + pub payload: Bytes, +} + +/// Various protocol related event types bubbled up from a session that need to be handled by the +/// network. +#[derive(Debug)] +pub enum CapabilityMessage { + /// Eth sub-protocol message. + Eth(EthMessage), + /// Any other capability message. + Other(RawCapabilityMessage), +} + +/// Protocol related request messages that expect a response +#[derive(Debug)] +pub enum CapabilityRequest { + /// Request Block headers from the peer. + /// + /// The response should be sent through the channel. + GetBlockHeaders { + request: GetBlockHeaders, + response: oneshot::Sender>, + }, +} + +/// The actual response object +#[derive(Debug)] +pub enum CapabilityResponse { + GetBlockHeaders(RequestResult), +} diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs new file mode 100644 index 000000000..76b903460 --- /dev/null +++ b/crates/net/network/src/network.rs @@ -0,0 +1,73 @@ +use crate::{manager::NetworkEvent, peers::PeersHandle, NodeId}; +use parking_lot::Mutex; +use reth_primitives::{H256, U256}; +use std::{ + net::SocketAddr, + sync::{atomic::AtomicUsize, Arc}, +}; +use tokio::sync::{mpsc, mpsc::UnboundedSender}; + +/// A _shareable_ network frontend. Used to interact with the network. +/// +/// See also [`NetworkManager`](crate::NetworkManager). +#[derive(Clone)] +pub struct NetworkHandle { + /// The Arc'ed delegate that contains the state. + inner: Arc, +} + +// === impl NetworkHandle === + +impl NetworkHandle { + /// Creates a single new instance. + pub(crate) fn new( + num_active_peers: Arc, + listener_address: Arc>, + to_manager_tx: UnboundedSender, + local_node_id: NodeId, + peers: PeersHandle, + ) -> Self { + let inner = NetworkInner { + num_active_peers, + to_manager_tx, + listener_address, + local_node_id, + peers, + }; + Self { inner: Arc::new(inner) } + } + + fn manager(&self) -> &UnboundedSender { + &self.inner.to_manager_tx + } + + /// Creates a new [`NetworkEvent`] listener channel. + pub fn event_listener(&self) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded_channel(); + let _ = self.manager().send(NetworkHandleMessage::EventListener(tx)); + rx + } +} + +struct NetworkInner { + /// Number of active peer sessions the node's currently handling. + num_active_peers: Arc, + /// Sender half of the message channel to the [`NetworkManager`]. + to_manager_tx: UnboundedSender, + /// The local address that accepts incoming connections. + listener_address: Arc>, + /// The identifier used by this node. + local_node_id: NodeId, + /// Access to the all the nodes + peers: PeersHandle, // TODO need something to access +} + +/// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager). +pub(crate) enum NetworkHandleMessage { + /// Add a new listener for [`NetworkEvent`]. + EventListener(UnboundedSender), + /// Broadcast event to announce a new block to all nodes. + AnnounceBlock, + /// Returns the newest imported block by the network. + NewestBlock(H256, U256), +} diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers.rs new file mode 100644 index 000000000..44fa6ad50 --- /dev/null +++ b/crates/net/network/src/peers.rs @@ -0,0 +1,175 @@ +use reth_discv4::NodeId; + +use futures::StreamExt; +use std::{ + collections::{hash_map::Entry, HashMap, VecDeque}, + net::SocketAddr, + task::{Context, Poll}, + time::Duration, +}; +use tokio::{ + sync::mpsc, + time::{Instant, Interval}, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// A communication channel to the [`PeersManager`] to apply changes to the peer set. +pub struct PeersHandle { + manager_tx: mpsc::UnboundedSender, +} + +/// Maintains the state of _all_ the peers known to the network. +/// +/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`]. +/// From this type, connections to peers are established or disconnected, see [`PeerAction`]. +/// +/// The [`PeersManager`] will be notified on peer related changes +pub(crate) struct PeersManager { + /// All peers known to the network + peers: HashMap, + /// Copy of the receiver half, so new [`PeersHandle`] can be created on demand. + manager_tx: mpsc::UnboundedSender, + /// Receiver half of the command channel. + handle_rx: UnboundedReceiverStream, + /// Buffered actions until the manager is polled. + actions: VecDeque, + /// Interval for triggering connections if there are free slots. + refill_slots_interval: Interval, + /// Tracks current slot stats. + connection_info: ConnectionInfo, +} + +impl PeersManager { + /// Create a new instance with the given config + pub(crate) fn new(config: PeersConfig) -> Self { + let PeersConfig { refill_slots_interval, connection_info } = config; + let (manager_tx, handle_rx) = mpsc::unbounded_channel(); + Self { + peers: Default::default(), + manager_tx, + handle_rx: UnboundedReceiverStream::new(handle_rx), + actions: Default::default(), + refill_slots_interval: tokio::time::interval_at( + Instant::now() + refill_slots_interval, + refill_slots_interval, + ), + connection_info, + } + } + + /// Returns a new [`PeersHandle`] that can send commands to this type + pub(crate) fn handle(&self) -> PeersHandle { + PeersHandle { manager_tx: self.manager_tx.clone() } + } + + pub(crate) fn add_discovered_node(&mut self, node: NodeId, addr: SocketAddr) { + match self.peers.entry(node) { + Entry::Occupied(_) => {} + Entry::Vacant(entry) => { + entry.insert(Node::new(addr)); + } + } + } + + pub(crate) fn remove_discovered_node(&mut self, _node: NodeId) {} + + /// If there's capacity for new outbound connections, this will queue new + /// [`PeerAction::Connect`] actions. + fn fill_outbound_slots(&mut self) { + // This checks if there are free slots for new outbound connections available that can be + // filled + } + + /// Advances the state. + /// + /// Event hooks invoked externally may trigger a new [`PeerAction`] that are buffered until + /// [`PeersManager::poll_next`] is called. + pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + loop { + // drain buffered actions + if let Some(action) = self.actions.pop_front() { + return Poll::Ready(action) + } + if self.refill_slots_interval.poll_tick(cx).is_ready() { + self.fill_outbound_slots(); + } + + while let Poll::Ready(Some(_cmd)) = self.handle_rx.poll_next_unpin(cx) { + // TODO handle incoming command + } + } + } +} + +/// Tracks stats about connected nodes +#[derive(Debug)] +pub struct ConnectionInfo { + /// Currently occupied slots for outbound connections. + num_outbound: usize, + /// Currently occupied slots for inbound connections. + num_inbound: usize, + /// Maximum allowed outbound connections. + max_outbound: usize, + /// Maximum allowed inbound connections. + max_inbound: usize, +} + +/// Tracks info about a single node. +struct Node { + /// Where to reach the node + addr: SocketAddr, + /// Reputation of the node. + reputation: i32, +} + +// === impl Node === + +impl Node { + fn new(addr: SocketAddr) -> Self { + Self { addr, reputation: 0 } + } +} + +/// Commands the [`PeersManager`] listens for. +pub enum PeerCommand { + Add(NodeId), + Remove(NodeId), + // TODO reputation change +} + +/// Actions the peer manager can trigger. +#[derive(Debug)] +pub enum PeerAction { + /// Start a new connection to a peer. + Connect { + /// The peer to connect to. + node_id: NodeId, + /// Where to reach the node + remote_addr: SocketAddr, + }, + /// Disconnect an existing connection. + Disconnect { node_id: NodeId }, +} + +/// Config type for initiating a [`PeersManager`] instance +#[derive(Debug)] +pub struct PeersConfig { + /// How even to recheck free slots for outbound connections + pub refill_slots_interval: Duration, + /// Restrictions on connections + pub connection_info: ConnectionInfo, +} + +impl Default for PeersConfig { + fn default() -> Self { + Self { + refill_slots_interval: Duration::from_millis(1_000), + connection_info: ConnectionInfo { + num_outbound: 0, + num_inbound: 0, + max_outbound: 70, + max_inbound: 30, + }, + } + } +} diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs new file mode 100644 index 000000000..e0509dfe7 --- /dev/null +++ b/crates/net/network/src/session/handle.rs @@ -0,0 +1,125 @@ +//! Session handles +use crate::{ + message::{Capabilities, CapabilityMessage}, + session::{Direction, SessionId}, + NodeId, +}; +use reth_ecies::{stream::ECIESStream, ECIESError}; +use std::{io, net::SocketAddr, sync::Arc, time::Instant}; +use tokio::{ + net::TcpStream, + sync::{mpsc, oneshot}, +}; + +/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello +/// message which exchanges the `capabilities` of the peer. +/// +/// This session needs to wait until it is authenticated. +#[derive(Debug)] +pub(crate) struct PendingSessionHandle { + /// Can be used to tell the session to disconnect the connection/abort the handshake process. + pub(crate) disconnect_tx: oneshot::Sender<()>, +} + +/// An established session with a remote peer. +/// +/// Within an active session that supports the `Ethereum Wire Protocol `, three high-level tasks can +/// be performed: chain synchronization, block propagation and transaction exchange. +#[derive(Debug)] +pub(crate) struct ActiveSessionHandle { + /// The assigned id for this session + pub(crate) session_id: SessionId, + /// The identifier of the remote peer + pub(crate) remote_id: NodeId, + /// The timestamp when the session has been established. + pub(crate) established: Instant, + /// Announced capabilities of the peer. + pub(crate) capabilities: Arc, + /// Sender half of the command channel used send commands _to_ the spawned session + pub(crate) commands: mpsc::Sender, +} + +// === impl ActiveSessionHandle === + +impl ActiveSessionHandle { + /// Sends a disconnect command to the session. + pub(crate) fn disconnect(&self) { + // Note: we clone the sender which ensures the channel has capacity to send the message + let _ = self.commands.clone().try_send(SessionCommand::Disconnect); + } +} + +/// Events a pending session can produce. +/// +/// This represents the state changes a session can undergo until it is ready to send capability messages . +/// +/// A session starts with a `Handshake`, followed by a `Hello` message which +#[derive(Debug)] +pub(crate) enum PendingSessionEvent { + /// Initial handshake step was successful + SuccessfulHandshake { remote_addr: SocketAddr, session_id: SessionId }, + /// Represents a successful `Hello` exchange: + Hello { + session_id: SessionId, + node_id: NodeId, + capabilities: Arc, + stream: ECIESStream, + }, + /// Handshake unsuccessful, session was disconnected. + Disconnected { + remote_addr: SocketAddr, + session_id: SessionId, + direction: Direction, + error: Option, + }, + /// Thrown when unable to establish a [`TcpStream`]. + OutgoingConnectionError { + remote_addr: SocketAddr, + session_id: SessionId, + node_id: NodeId, + error: io::Error, + }, + /// Thrown when authentication via Ecies failed. + EciesAuthError { remote_addr: SocketAddr, session_id: SessionId, error: ECIESError }, +} + +/// Commands that can be sent to the spawned session. +#[derive(Debug)] +pub(crate) enum SessionCommand { + /// Disconnect the connection + Disconnect, + Message(CapabilityMessage), +} + +/// Message variants an active session can produce and send back to the +/// [`SessionManager`](crate::session::SessionManager) +#[derive(Debug)] +pub(crate) enum ActiveSessionMessage { + /// Session disconnected. + Closed { node_id: NodeId, remote_addr: SocketAddr }, + /// A session received a valid message via RLPx. + ValidMessage { + /// Identifier of the remote peer. + node_id: NodeId, + /// Message received from the peer. + message: CapabilityMessage, + }, + /// Received a message that does not match the announced capabilities of the peer. + InvalidMessage { + /// Identifier of the remote peer. + node_id: NodeId, + /// Announced capabilities of the remote peer. + capabilities: Arc, + /// Message received from the peer. + message: CapabilityMessage, + }, +} + +/// A Cloneable connection for sending messages directly to the session of a peer. +#[derive(Debug, Clone)] +pub struct PeerMessageSender { + /// id of the remote node. + pub(crate) peer: NodeId, + /// The Sender half connected to a session. + pub(crate) to_session_tx: mpsc::Sender, +} diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs new file mode 100644 index 000000000..d8ccd83ef --- /dev/null +++ b/crates/net/network/src/session/mod.rs @@ -0,0 +1,518 @@ +//! Support for handling peer sessions. +use crate::{ + message::{Capabilities, CapabilityMessage}, + session::handle::{ + ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, + }, + NodeId, +}; +use fnv::FnvHashMap; +use futures::{future::Either, io, FutureExt, StreamExt}; +pub use handle::PeerMessageSender; +use reth_ecies::{stream::ECIESStream, ECIESError}; +use reth_eth_wire::UnauthedEthStream; +use secp256k1::{SecretKey, SECP256K1}; +use std::{ + collections::HashMap, + future::Future, + net::SocketAddr, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::{ + net::TcpStream, + sync::{mpsc, oneshot}, + task::JoinSet, +}; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{instrument, trace, warn}; + +mod handle; + +/// Internal identifier for active sessions. +#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)] +pub struct SessionId(usize); + +/// Manages a set of sessions. +#[must_use = "Session Manager must be polled to process session events."] +pub(crate) struct SessionManager { + /// Tracks the identifier for the next session. + next_id: usize, + /// The secret key used for authenticating sessions. + secret_key: SecretKey, + /// The node id of node + node_id: NodeId, + /// Size of the command buffer per session. + session_command_buffer: usize, + /// All spawned session tasks. + /// + /// Note: If dropped, the session tasks are aborted. + spawned_tasks: JoinSet<()>, + /// All pending session that are currently handshaking, exchanging `Hello`s. + /// + /// Events produced during the authentication phase are reported to this manager. Once the + /// session is authenticated, it can be moved to the `active_session` set. + pending_sessions: FnvHashMap, + /// All active sessions that are ready to exchange messages. + active_sessions: HashMap, + /// The original Sender half of the [`PendingSessionEvent`] channel. + /// + /// When a new (pending) session is created, the corresponding [`PendingSessionHandle`] will + /// get a clone of this sender half. + pending_sessions_tx: mpsc::Sender, + /// Receiver half that listens for [`PendingSessionEvent`] produced by pending sessions. + pending_session_rx: ReceiverStream, + /// The original Sender half of the [`ActiveSessionEvent`] channel. + /// + /// When active session state is reached, the corresponding [`ActiveSessionHandle`] will get a + /// clone of this sender half. + active_session_tx: mpsc::Sender, + /// Receiver half that listens for [`ActiveSessionEvent`] produced by pending sessions. + active_session_rx: ReceiverStream, +} + +// === impl SessionManager === + +impl SessionManager { + /// Creates a new empty [`SessionManager`]. + pub(crate) fn new(secret_key: SecretKey, config: SessionsConfig) -> Self { + let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer); + let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer); + + let pk = secret_key.public_key(SECP256K1); + let node_id = NodeId::from_slice(&pk.serialize_uncompressed()[1..]); + + Self { + next_id: 0, + secret_key, + node_id, + session_command_buffer: config.session_command_buffer, + spawned_tasks: Default::default(), + pending_sessions: Default::default(), + active_sessions: Default::default(), + pending_sessions_tx, + pending_session_rx: ReceiverStream::new(pending_sessions_rx), + active_session_tx, + active_session_rx: ReceiverStream::new(active_session_rx), + } + } + + /// Returns the next unique [`SessionId`]. + fn next_id(&mut self) -> SessionId { + let id = self.next_id; + self.next_id += 1; + SessionId(id) + } + + /// Spawns the given future onto a new task that is tracked in the `spawned_tasks` [`JoinSet`]. + fn spawn(&mut self, f: F) + where + F: Future + Send + 'static, + { + self.spawned_tasks.spawn(async move { f.await }); + } + + /// A incoming TCP connection was received. This starts the authentication process to turn this + /// stream into an active peer session. + /// + /// Returns an error if the configured limit has been reached. + pub(crate) fn on_incoming( + &mut self, + stream: TcpStream, + remote_addr: SocketAddr, + ) -> Result { + // TODO(mattsse): enforce limits + let session_id = self.next_id(); + let (disconnect_tx, disconnect_rx) = oneshot::channel(); + let pending_events = self.pending_sessions_tx.clone(); + self.spawn(start_pending_incoming_session( + disconnect_rx, + session_id, + stream, + pending_events, + remote_addr, + self.secret_key, + )); + + let handle = PendingSessionHandle { disconnect_tx }; + self.pending_sessions.insert(session_id, handle); + Ok(session_id) + } + + /// Starts a new pending session from the local node to the given remote node. + pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_node_id: NodeId) { + let session_id = self.next_id(); + let (disconnect_tx, disconnect_rx) = oneshot::channel(); + let pending_events = self.pending_sessions_tx.clone(); + self.spawn(start_pending_outbound_session( + disconnect_rx, + pending_events, + session_id, + remote_addr, + remote_node_id, + self.secret_key, + )); + + let handle = PendingSessionHandle { disconnect_tx }; + self.pending_sessions.insert(session_id, handle); + } + + /// Initiates a shutdown of the channel. + /// + /// This will trigger the disconnect on the session task to gracefully terminate. The result + /// will be picked up by the receiver. + pub(crate) fn disconnect(&self, node: NodeId) { + if let Some(session) = self.active_sessions.get(&node) { + session.disconnect(); + } + } + + /// This polls all the session handles and returns [`SessionEvent`]. + /// + /// Active sessions are prioritized. + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + // Poll events from active sessions + match self.active_session_rx.poll_next_unpin(cx) { + Poll::Pending => {} + Poll::Ready(None) => { + unreachable!("Manager holds both channel halves.") + } + Poll::Ready(Some(event)) => { + return match event { + ActiveSessionMessage::Closed { node_id, remote_addr } => { + trace!(?node_id, target = "net::session", "closed active session."); + let _ = self.active_sessions.remove(&node_id); + Poll::Ready(SessionEvent::Disconnected { node_id, remote_addr }) + } + ActiveSessionMessage::ValidMessage { node_id, message } => { + // TODO: since all messages are known they should be decoded in the session + Poll::Ready(SessionEvent::ValidMessage { node_id, message }) + } + ActiveSessionMessage::InvalidMessage { node_id, capabilities, message } => { + Poll::Ready(SessionEvent::InvalidMessage { node_id, message, capabilities }) + } + } + } + } + + // Poll the pending session event stream + loop { + let event = match self.pending_session_rx.poll_next_unpin(cx) { + Poll::Pending => break, + Poll::Ready(None) => unreachable!("Manager holds both channel halves."), + Poll::Ready(Some(event)) => event, + }; + match event { + PendingSessionEvent::SuccessfulHandshake { remote_addr, session_id } => { + trace!( + ?session_id, + ?remote_addr, + target = "net::session", + "successful handshake" + ); + } + PendingSessionEvent::Hello { + session_id, + node_id: _, + capabilities: _, + stream: _, + } => { + // move from pending to established. + let _ = self.pending_sessions.remove(&session_id); + + // TODO spawn the authenticated session + // let session = ActiveSessionHandle { + // session_id, + // remote_id: node_id, + // established: Instant::now(), + // capabilities, + // commands + // }; + // self.active_sessions.insert(node_id, session); + // return Poll::Ready(SessionEvent::SessionAuthenticated { + // node_id, + // capabilities, + // messages: () + // }) + } + PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => { + trace!( + ?session_id, + ?remote_addr, + target = "net::session", + "disconnected pending session" + ); + let _ = self.pending_sessions.remove(&session_id); + return match direction { + Direction::Incoming => { + Poll::Ready(SessionEvent::IncomingPendingSessionClosed { + remote_addr, + error, + }) + } + Direction::Outgoing(node_id) => { + Poll::Ready(SessionEvent::OutgoingPendingSessionClosed { + remote_addr, + node_id, + error, + }) + } + } + } + PendingSessionEvent::OutgoingConnectionError { + remote_addr, + session_id, + node_id, + error, + } => { + trace!( + ?error, + ?session_id, + ?remote_addr, + ?node_id, + target = "net::session", + "connection refused" + ); + let _ = self.pending_sessions.remove(&session_id); + return Poll::Ready(SessionEvent::IncomingPendingSessionClosed { + remote_addr, + error: None, + }) + } + PendingSessionEvent::EciesAuthError { remote_addr, session_id, error } => { + let _ = self.pending_sessions.remove(&session_id); + warn!( + ?error, + ?session_id, + ?remote_addr, + target = "net::session", + "ecies auth failed" + ); + let _ = self.pending_sessions.remove(&session_id); + return Poll::Ready(SessionEvent::IncomingPendingSessionClosed { + remote_addr, + error: None, + }) + } + } + } + + Poll::Pending + } +} + +/// Configuration options when creating a [`SessionsManager`]. +pub struct SessionsConfig { + /// Size of the session command buffer (per session task). + pub session_command_buffer: usize, + /// Size of the session event channel buffer. + pub session_event_buffer: usize, +} + +impl Default for SessionsConfig { + fn default() -> Self { + SessionsConfig { + // This should be sufficient to slots for handling commands sent to the session task, + // since the manager is the sender. + session_command_buffer: 10, + // This should be greater since the manager is the receiver. The total size will be + // `buffer + num sessions`. Each session can therefor fit at least 1 message in the + // channel. The buffer size is additional capacity. The channel is always drained on + // `poll`. + session_event_buffer: 64, + } + } +} + +impl SessionsConfig { + /// Sets the buffer size for the bounded communication channel between the manager and its + /// sessions for events emitted by the sessions. + /// + /// It is expected, that the background session task will stall if they outpace the manager. The + /// buffer size provides backpressure on the network I/O. + pub fn with_session_event_buffer(mut self, n: usize) -> Self { + self.session_event_buffer = n; + self + } +} + +/// Events produced by the [`SessionManager`] +pub(crate) enum SessionEvent { + /// A new session was successfully authenticated. + /// + /// This session is now able to exchange data. + SessionAuthenticated { + node_id: NodeId, + remote_addr: SocketAddr, + capabilities: Arc, + messages: PeerMessageSender, + }, + /// A session received a valid message via RLPx. + ValidMessage { + node_id: NodeId, + /// Message received from the peer. + message: CapabilityMessage, + }, + /// Received a message that does not match the announced capabilities of the peer. + InvalidMessage { + node_id: NodeId, + /// Announced capabilities of the remote peer. + capabilities: Arc, + /// Message received from the peer. + message: CapabilityMessage, + }, + /// Closed an incoming pending session during authentication. + IncomingPendingSessionClosed { remote_addr: SocketAddr, error: Option }, + /// Closed an outgoing pending session during authentication. + OutgoingPendingSessionClosed { + remote_addr: SocketAddr, + node_id: NodeId, + error: Option, + }, + /// Failed to establish a tcp stream + OutgoingConnectionError { remote_addr: SocketAddr, node_id: NodeId, error: io::Error }, + /// Active session was disconnected. + Disconnected { node_id: NodeId, remote_addr: SocketAddr }, +} + +/// The error thrown when the max configured limit has been reached and no more connections are +/// accepted. +#[derive(Debug, Clone, thiserror::Error)] +#[error("Session limit reached {0}")] +pub struct ExceedsSessionLimit(usize); + +/// Starts the authentication process for a connection initiated by a remote peer. +/// +/// This will wait for the _incoming_ handshake request and answer it. +async fn start_pending_incoming_session( + disconnect_rx: oneshot::Receiver<()>, + session_id: SessionId, + stream: TcpStream, + events: mpsc::Sender, + remote_addr: SocketAddr, + secret_key: SecretKey, +) { + authenticate( + disconnect_rx, + events, + stream, + session_id, + remote_addr, + secret_key, + Direction::Incoming, + ) + .await +} + +/// Starts the authentication process for a connection initiated by a remote peer. +#[instrument(skip_all, fields(%remote_addr, node_id), target = "net")] +async fn start_pending_outbound_session( + disconnect_rx: oneshot::Receiver<()>, + events: mpsc::Sender, + session_id: SessionId, + remote_addr: SocketAddr, + remote_node_id: NodeId, + secret_key: SecretKey, +) { + let stream = match TcpStream::connect(remote_addr).await { + Ok(stream) => stream, + Err(error) => { + let _ = events + .send(PendingSessionEvent::OutgoingConnectionError { + remote_addr, + session_id, + node_id: remote_node_id, + error, + }) + .await; + return + } + }; + authenticate( + disconnect_rx, + events, + stream, + session_id, + remote_addr, + secret_key, + Direction::Outgoing(remote_node_id), + ) + .await +} + +/// The direction of the connection. +#[derive(Debug, Copy, Clone)] +pub(crate) enum Direction { + /// Incoming connection. + Incoming, + /// Outgoing connection to a specific node. + Outgoing(NodeId), +} + +async fn authenticate( + disconnect_rx: oneshot::Receiver<()>, + events: mpsc::Sender, + stream: TcpStream, + session_id: SessionId, + remote_addr: SocketAddr, + secret_key: SecretKey, + direction: Direction, +) { + let stream = match direction { + Direction::Incoming => match ECIESStream::incoming(stream, secret_key).await { + Ok(stream) => stream, + Err(error) => { + let _ = events + .send(PendingSessionEvent::EciesAuthError { remote_addr, session_id, error }) + .await; + return + } + }, + Direction::Outgoing(remote_node_id) => { + match ECIESStream::connect(stream, secret_key, remote_node_id).await { + Ok(stream) => stream, + Err(error) => { + let _ = events + .send(PendingSessionEvent::EciesAuthError { + remote_addr, + session_id, + error, + }) + .await; + return + } + } + } + }; + + let unauthed = UnauthedEthStream::new(stream); + let auth = authenticate_stream(unauthed, session_id, remote_addr, direction).boxed(); + + match futures::future::select(disconnect_rx, auth).await { + Either::Left((_, _)) => { + let _ = events + .send(PendingSessionEvent::Disconnected { + remote_addr, + session_id, + direction, + error: None, + }) + .await; + } + Either::Right((res, _)) => { + let _ = events.send(res).await; + } + } +} + +/// Authenticate the stream via handshake +/// +/// On Success return the authenticated stream as [`PendingSessionEvent`] +async fn authenticate_stream( + _stream: UnauthedEthStream>, + _session_id: SessionId, + _remote_addr: SocketAddr, + _direction: Direction, +) -> PendingSessionEvent { + todo!() +} diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs new file mode 100644 index 000000000..4edc36922 --- /dev/null +++ b/crates/net/network/src/state.rs @@ -0,0 +1,217 @@ +//! Keeps track of the state of the network. + +use crate::{ + discovery::{Discovery, DiscoveryEvent}, + fetch::StateFetcher, + message::{Capabilities, CapabilityResponse}, + peers::{PeerAction, PeersManager}, + session::PeerMessageSender, + NodeId, +}; +use futures::FutureExt; + +use reth_interfaces::provider::BlockProvider; +use reth_primitives::{H256, U256}; +use std::{ + collections::{HashMap, VecDeque}, + net::SocketAddr, + sync::Arc, + task::{Context, Poll}, + time::Instant, +}; +use tokio::sync::oneshot; +use tracing::trace; + +/// The [`NetworkState`] keeps track of the state of all peers in the network. +/// +/// This includes: +/// - [`Discovery`]: manages the discovery protocol, essentially a stream of discovery updates +/// - [`PeersManager`]: keeps track of connected peers and issues new outgoing connections +/// depending on the configured capacity. +/// - [`StateFetcher`]: streams download request (received from outside via channel) which are +/// then send to the session of the peer. +/// +/// This type is also responsible for responding for received request. +pub struct NetworkState { + /// All connected peers and their state. + connected_peers: HashMap, + /// Manages connections to peers. + peers_manager: PeersManager, + /// Tracks the state of connected peers + peers_state: HashMap, + /// Buffered messages until polled. + queued_messages: VecDeque, + /// The client type that can interact with the chain. + client: Arc, + /// Network discovery. + discovery: Discovery, + /// The type that handles requests. + /// + /// The fetcher streams RLPx related requests on a per-peer basis to this type. This type will + /// then queue in the request and notify the fetcher once the result has been received. + state_fetcher: StateFetcher, +} + +impl NetworkState +where + C: BlockProvider, +{ + /// Create a new state instance with the given params + pub(crate) fn new(client: Arc, discovery: Discovery, peers_manager: PeersManager) -> Self { + Self { + connected_peers: Default::default(), + peers_manager, + peers_state: Default::default(), + queued_messages: Default::default(), + client, + discovery, + state_fetcher: Default::default(), + } + } + + /// Event hook for an authenticated session for the peer. + pub(crate) fn on_session_authenticated( + &mut self, + _node_id: NodeId, + _capabilities: Arc, + _messages: PeerMessageSender, + ) { + // TODO notify fetecher as well + } + + /// Event hook for a disconnected session for the peer. + pub(crate) fn on_session_closed(&mut self, _node_id: NodeId) {} + + /// Propagates Block to peers. + pub(crate) fn announce_block(&mut self, _hash: H256, _block: ()) { + // TODO propagate the newblock messages to all connected peers that haven't seen the block + // yet + + todo!() + } + + /// Event hook for events received from the discovery service. + fn on_discovery_event(&mut self, event: DiscoveryEvent) { + match event { + DiscoveryEvent::Discovered(node, addr) => { + self.peers_manager.add_discovered_node(node, addr); + } + } + } + + /// Event hook for new actions derived from the peer management set. + fn on_peer_action(&mut self, action: PeerAction) { + match action { + PeerAction::Connect { node_id, remote_addr } => { + self.peers_state.insert(node_id, PeerSessionState::Connecting); + self.queued_messages.push_back(StateAction::Connect { node_id, remote_addr }); + } + PeerAction::Disconnect { node_id } => { + self.peers_state.remove(&node_id); + self.queued_messages.push_back(StateAction::Disconnect { node_id }); + } + } + } + + /// Disconnect the session + fn disconnect_session(&mut self, _node: NodeId) {} + + /// Invoked when received a response from a connected peer. + fn on_response(&mut self, _node: NodeId, _resp: CapabilityResponse) {} + + /// Advances the state + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + loop { + // drain buffered messages + if let Some(message) = self.queued_messages.pop_front() { + return Poll::Ready(message) + } + + while let Poll::Ready(discovery) = self.discovery.poll(cx) { + self.on_discovery_event(discovery); + } + + let mut disconnect_sessions = Vec::new(); + let mut received_responses = Vec::new(); + // poll all connected peers for responses + for (id, peer) in self.connected_peers.iter_mut() { + if let Some(response) = peer.pending_response.as_mut() { + match response.poll_unpin(cx) { + Poll::Ready(Ok(resp)) => received_responses.push((*id, resp)), + Poll::Ready(Err(_)) => { + trace!( + ?id, + target = "net", + "Request canceled, response channel closed." + ); + disconnect_sessions.push(*id); + } + Poll::Pending => continue, + }; + } + + // request has either returned a response or was canceled here + peer.pending_response.take(); + } + + for node in disconnect_sessions { + self.disconnect_session(node) + } + + for (id, resp) in received_responses { + self.on_response(id, resp); + } + + // poll peer manager + while let Poll::Ready(action) = self.peers_manager.poll(cx) { + self.on_peer_action(action); + } + + if self.queued_messages.is_empty() { + return Poll::Pending + } + } + } +} + +/// Tracks the state of a Peer. +/// +/// For example known blocks,so we can decide what to announce. +pub struct ConnectedPeer { + /// Best block of the peer. + pub(crate) best_hash: H256, + /// Best block number of the peer. + pub(crate) best_number: U256, + /// A communication channel directly to the session service. + pub(crate) message_tx: PeerMessageSender, + /// The response receiver for a currently active request to that peer. + pub(crate) pending_response: Option>, +} + +/// Tracks the current state of the peer session +pub enum PeerSessionState { + /// Starting state for outbound connections. + /// + /// This will be triggered by a [`PeerAction::Connect`] action. + /// The peer will reside in the state until the connection has been authenticated. + Connecting, + /// Established connection that hasn't been authenticated yet. + Incoming { + /// How long to keep this open. + until: Instant, + sender: PeerMessageSender, + }, + /// Node is connected to the peer and is ready to + Ready { + /// Communication channel directly to the session task + sender: PeerMessageSender, + }, +} + +/// Message variants triggered by the [`State`] +pub enum StateAction { + /// Create a new connection to the given node. + Connect { remote_addr: SocketAddr, node_id: NodeId }, + /// Disconnect an existing connection + Disconnect { node_id: NodeId }, +} diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs new file mode 100644 index 000000000..c5e631678 --- /dev/null +++ b/crates/net/network/src/swarm.rs @@ -0,0 +1,242 @@ +use crate::{ + listener::{ConnectionListener, ListenerEvent}, + message::{Capabilities, CapabilityMessage}, + session::{SessionEvent, SessionId, SessionManager}, + state::{NetworkState, StateAction}, + NodeId, +}; +use futures::Stream; +use reth_ecies::ECIESError; +use reth_interfaces::provider::BlockProvider; +use std::{ + io, + net::SocketAddr, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tracing::warn; + +/// Contains the connectivity related state of the network. +/// +/// A swarm emits [`SwarmEvent`]s when polled. +/// +/// The manages the [`ConnectionListener`] and delegates new incoming connections to the +/// [`SessionsManager`]. Outgoing connections are either initiated on demand or triggered by the +/// [`NetworkState`] and also delegated to the [`NetworkState`]. +#[must_use = "Swarm does nothing unless polled"] +pub struct Swarm { + /// Listens for new incoming connections. + incoming: ConnectionListener, + /// All sessions. + sessions: SessionManager, + /// Tracks the entire state of the network and handles events received from the sessions. + state: NetworkState, +} + +// === impl Swarm === + +impl Swarm +where + C: BlockProvider, +{ + /// Configures a new swarm instance. + pub(crate) fn new( + incoming: ConnectionListener, + sessions: SessionManager, + state: NetworkState, + ) -> Self { + Self { incoming, sessions, state } + } + + /// Mutable access to the state. + pub(crate) fn state_mut(&mut self) -> &mut NetworkState { + &mut self.state + } + + /// Triggers a new outgoing connection to the given node + pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: NodeId) { + self.sessions.dial_outbound(remote_addr, remote_id) + } + + /// Handles a polled [`SessionEvent`] + fn on_session_event(&mut self, event: SessionEvent) -> Option { + match event { + SessionEvent::SessionAuthenticated { node_id, remote_addr, capabilities, messages } => { + self.state.on_session_authenticated(node_id, capabilities, messages); + Some(SwarmEvent::SessionEstablished { node_id, remote_addr }) + } + SessionEvent::ValidMessage { node_id, message } => { + Some(SwarmEvent::CapabilityMessage { node_id, message }) + } + SessionEvent::InvalidMessage { node_id, capabilities, message } => { + Some(SwarmEvent::InvalidCapabilityMessage { node_id, capabilities, message }) + } + SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => { + Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error }) + } + SessionEvent::OutgoingPendingSessionClosed { remote_addr, node_id, error } => { + Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, node_id, error }) + } + SessionEvent::Disconnected { node_id, remote_addr } => { + self.state.on_session_closed(node_id); + Some(SwarmEvent::SessionClosed { node_id, remote_addr }) + } + SessionEvent::OutgoingConnectionError { remote_addr, node_id, error } => { + Some(SwarmEvent::OutgoingConnectionError { node_id, remote_addr, error }) + } + } + } + + /// Callback for events produced by [`ConnectionListener`]. + /// + /// Depending on the event, this will produce a new [`SwarmEvent`]. + fn on_connection(&mut self, event: ListenerEvent) -> Option { + match event { + ListenerEvent::Error(err) => return Some(SwarmEvent::TcpListenerError(err)), + ListenerEvent::ListenerClosed { local_address: address } => { + return Some(SwarmEvent::TcpListenerClosed { remote_addr: address }) + } + ListenerEvent::Incoming { stream, remote_addr } => { + match self.sessions.on_incoming(stream, remote_addr) { + Ok(session_id) => { + return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr }) + } + Err(err) => { + warn!(?err, "Incoming connection rejected"); + } + } + } + } + None + } + + /// Hook for actions pulled from the state + fn on_state_action(&mut self, event: StateAction) -> Option { + match event { + StateAction::Connect { remote_addr, node_id } => { + self.sessions.dial_outbound(remote_addr, node_id); + } + StateAction::Disconnect { node_id } => { + self.sessions.disconnect(node_id); + } + } + None + } +} + +impl Stream for Swarm +where + C: BlockProvider, +{ + type Item = SwarmEvent; + + /// This advances all components. + /// + /// Processes, delegates (internal) commands received from the [`NetworkManager`], then polls + /// the [`SessionManager`] which yields messages produced by individual peer sessions that are + /// then handled. Least priority are incoming connections that are handled and delegated to + /// the [`SessionManager`] to turn them into a session. + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + while let Poll::Ready(action) = this.state.poll(cx) { + if let Some(event) = this.on_state_action(action) { + return Poll::Ready(Some(event)) + } + } + + // poll all sessions + match this.sessions.poll(cx) { + Poll::Pending => {} + Poll::Ready(event) => { + if let Some(event) = this.on_session_event(event) { + return Poll::Ready(Some(event)) + } + continue + } + } + + // poll listener for incoming connections + match Pin::new(&mut this.incoming).poll(cx) { + Poll::Pending => {} + Poll::Ready(event) => { + if let Some(event) = this.on_connection(event) { + return Poll::Ready(Some(event)) + } + continue + } + } + + return Poll::Pending + } + } +} + +/// All events created or delegated by the [`Swarm`] that represents changes to the state of the +/// network. +pub enum SwarmEvent { + /// Events related to the actual network protocol. + CapabilityMessage { + /// The peer that sent the message + node_id: NodeId, + /// Message received from the peer + message: CapabilityMessage, + }, + /// Received a message that does not match the announced capabilities of the peer. + InvalidCapabilityMessage { + node_id: NodeId, + /// Announced capabilities of the remote peer. + capabilities: Arc, + /// Message received from the peer. + message: CapabilityMessage, + }, + /// The underlying tcp listener closed. + TcpListenerClosed { + /// Address of the closed listener. + remote_addr: SocketAddr, + }, + /// The underlying tcp listener encountered an error that we bubble up. + TcpListenerError(io::Error), + /// Received an incoming tcp connection. + /// + /// This represents the first step in the session authentication process. The swarm will + /// produce subsequent events once the stream has been authenticated, or was rejected. + IncomingTcpConnection { + /// The internal session identifier under which this connection is currently tracked. + session_id: SessionId, + /// Address of the remote peer. + remote_addr: SocketAddr, + }, + /// An outbound connection is initiated. + OutgoingTcpConnection { + /// Address of the remote peer. + remote_addr: SocketAddr, + }, + SessionEstablished { + node_id: NodeId, + remote_addr: SocketAddr, + }, + SessionClosed { + node_id: NodeId, + remote_addr: SocketAddr, + }, + /// Closed an incoming pending session during authentication. + IncomingPendingSessionClosed { + remote_addr: SocketAddr, + error: Option, + }, + /// Closed an outgoing pending session during authentication. + OutgoingPendingSessionClosed { + remote_addr: SocketAddr, + node_id: NodeId, + error: Option, + }, + /// Failed to establish a tcp stream to the given address/node + OutgoingConnectionError { + remote_addr: SocketAddr, + node_id: NodeId, + error: io::Error, + }, +} diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs new file mode 100644 index 000000000..d5f12cd28 --- /dev/null +++ b/crates/net/network/src/transactions.rs @@ -0,0 +1,86 @@ +//! Transaction management for the p2p network. + +use crate::{manager::NetworkEvent, NetworkHandle}; +use reth_primitives::{Transaction, H256}; +use reth_transaction_pool::TransactionPool; +use std::collections::HashMap; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// Api to interact with [`TransactionsManager`] task. +pub struct TransactionsHandle { + /// Command channel to the [`TransactionsManager`] + manager_tx: mpsc::UnboundedSender, +} + +/// Manages transactions on top of the p2p network. +/// +/// This can be spawned to another task and is supposed to be run as background service while +/// [`TransactionsHandle`] is used as frontend to send commands to. +/// +/// The [`TransactionsManager`] is responsible for: +/// - handling incoming eth messages for transactions. +/// - serving transaction requests. +/// - propagate transactions +/// +/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions. +/// - receives incoming network messages. +/// - sends messages to dispatch (responses, propagate tx) +/// +/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and +/// propagate new transactions over the network. +#[must_use = "Manager does nothing unless polled."] +pub struct TransactionsManager { + /// Access to the transaction pool. + pool: Pool, + /// Network access. + network: NetworkHandle, + /// Subscriptions to all network related events. + /// + /// From which we get all new incoming transaction related messages. + network_events: UnboundedReceiverStream, + /// All currently pending transactions + pending_transactions: (), + /// All the peers that have sent the same transactions. + peers: HashMap>, + /// Send half for the command channel. + command_tx: mpsc::UnboundedSender, + /// Incoming commands from [`TransactionsHandle`]. + command_rx: UnboundedReceiverStream, +} + +// === impl TransactionsManager === + +impl TransactionsManager +where + Pool: TransactionPool, +{ + /// Sets up a new instance. + pub fn new(network: NetworkHandle, pool: Pool) -> Self { + let network_events = network.event_listener(); + let (command_tx, command_rx) = mpsc::unbounded_channel(); + + Self { + pool, + network, + network_events: UnboundedReceiverStream::new(network_events), + pending_transactions: (), + peers: Default::default(), + command_tx, + command_rx: UnboundedReceiverStream::new(command_rx), + } + } + + /// Returns a new handle that can send commands to this type. + pub fn handle(&self) -> TransactionsHandle { + TransactionsHandle { manager_tx: self.command_tx.clone() } + } + + /// Executes an endless future + pub async fn run(self) {} +} + +/// Commands to send to the [`TransactionManager`] +enum TransactionsCommand { + Propagate(H256), +}