feat(net): network scaffolding (#110)

* feat(net): network scaffolding
This commit is contained in:
Matthias Seitz
2022-11-07 09:27:08 +01:00
committed by GitHub
parent 9c40f8265b
commit b7cdfbf4f9
25 changed files with 2743 additions and 12 deletions

View File

@ -33,6 +33,12 @@ pub struct Discv4Config {
pub ban_duration: Option<Duration>,
/// Nodes to boot from.
pub bootstrap_nodes: HashSet<NodeRecord>,
/// Whether to randomly discover new peers.
///
/// If true, the node will automatically randomly walk the DHT in order to find new peers.
pub enable_dht_random_walk: bool,
/// Whether to automatically lookup peers.
pub enable_lookup: bool,
}
impl Discv4Config {
@ -55,10 +61,13 @@ impl Default for Discv4Config {
permit_ban_list: PermitBanList::default(),
ban_duration: Some(Duration::from_secs(3600)), // 1 hour
bootstrap_nodes: Default::default(),
enable_dht_random_walk: true,
enable_lookup: true,
}
}
}
/// Builder type for [`Discv4Config`]
#[derive(Debug, Default)]
pub struct Discv4ConfigBuilder {
config: Discv4Config,
@ -89,6 +98,18 @@ impl Discv4ConfigBuilder {
self
}
/// Whether to discover random nodes in the DHT.
pub fn enable_dht_random_walk(&mut self, enable_dht_random_walk: bool) -> &mut Self {
self.config.enable_dht_random_walk = enable_dht_random_walk;
self
}
/// Whether to automatically lookup
pub fn enable_lookup(&mut self, enable_lookup: bool) -> &mut Self {
self.config.enable_lookup = enable_lookup;
self
}
/// A set of lists that permit or ban IP's or NodeIds from the server. See
/// `crate::PermitBanList`.
pub fn permit_ban_list(&mut self, list: PermitBanList) -> &mut Self {
@ -122,7 +143,26 @@ impl Discv4ConfigBuilder {
self
}
pub fn build(&mut self) -> Discv4Config {
/// Returns the configured [`Discv4Config`]
pub fn build(&self) -> Discv4Config {
self.config.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_builder() {
let mut builder = Discv4Config::builder();
let _ = builder
.enable_lookup(true)
.enable_dht_random_walk(true)
.add_boot_nodes(HashSet::new())
.ban_duration(None)
.lookup_interval(Duration::from_secs(3))
.enable_lookup(true)
.build();
}
}

View File

@ -57,7 +57,7 @@ pub mod error;
mod proto;
mod config;
pub use config::Discv4Config;
pub use config::{Discv4Config, Discv4ConfigBuilder};
mod node;
pub use node::NodeRecord;
@ -218,11 +218,17 @@ impl Discv4 {
async fn lookup_node(&self, node_id: Option<NodeId>) -> Result<Vec<NodeRecord>, Discv4Error> {
let (tx, rx) = oneshot::channel();
let cmd = Discv4Command::Lookup { node_id, tx };
let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
self.to_service.send(cmd).await?;
Ok(rx.await?)
}
/// Triggers a new self lookup without expecting a response
pub fn send_lookup_self(&self) {
let cmd = Discv4Command::Lookup { node_id: None, tx: None };
let _ = self.to_service.try_send(cmd);
}
/// Returns the receiver half of new listener channel that streams [`TableUpdate`]s.
pub async fn update_stream(&self) -> Result<ReceiverStream<TableUpdate>, Discv4Error> {
let (tx, rx) = oneshot::channel();
@ -326,6 +332,12 @@ impl Discv4Service {
let evict_expired_requests_interval = tokio::time::interval(config.find_node_timeout);
let lookup_rotator = if config.enable_dht_random_walk {
LookupTargetRotator::default()
} else {
LookupTargetRotator::local_only()
};
Discv4Service {
local_address,
local_enr,
@ -345,7 +357,7 @@ impl Discv4Service {
ping_interval,
evict_expired_requests_interval,
config,
lookup_rotator: Default::default(),
lookup_rotator,
}
}
@ -846,7 +858,7 @@ impl Discv4Service {
/// if it has sent a valid Pong response with matching ping hash within the last 12 hours.
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
// trigger self lookup
if self.lookup_interval.poll_tick(cx).is_ready() {
if self.config.enable_lookup && self.lookup_interval.poll_tick(cx).is_ready() {
let target = self.lookup_rotator.next(&self.local_enr.id);
self.lookup_with(target, None);
}
@ -856,7 +868,7 @@ impl Discv4Service {
self.evict_expired_requests(Instant::now())
}
// reping some peers
// re-ping some peers
if self.ping_interval.poll_tick(cx).is_ready() {
self.reping_oldest();
}
@ -869,7 +881,7 @@ impl Discv4Service {
match cmd {
Discv4Command::Lookup { node_id, tx } => {
let node_id = node_id.unwrap_or(self.local_enr.id);
self.lookup_with(node_id, Some(tx));
self.lookup_with(node_id, tx);
}
Discv4Command::Updates(tx) => {
let rx = self.update_stream();
@ -998,7 +1010,7 @@ pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_i
/// The commands sent from the frontend to the service
enum Discv4Command {
Lookup { node_id: Option<NodeId>, tx: NodeRecordSender },
Lookup { node_id: Option<NodeId>, tx: Option<NodeRecordSender> },
Updates(OneshotSender<ReceiverStream<TableUpdate>>),
}
@ -1033,6 +1045,15 @@ struct LookupTargetRotator {
counter: usize,
}
// === impl LookupTargetRotator ===
impl LookupTargetRotator {
/// Returns a rotator that always returns the local target.
fn local_only() -> Self {
Self { interval: 1, counter: 0 }
}
}
impl Default for LookupTargetRotator {
fn default() -> Self {
Self {
@ -1253,6 +1274,25 @@ mod tests {
};
use tracing_test::traced_test;
#[test]
fn test_local_rotator() {
let id = NodeId::random();
let mut rotator = LookupTargetRotator::local_only();
assert_eq!(rotator.next(&id), id);
assert_eq!(rotator.next(&id), id);
}
#[test]
fn test_rotator() {
let id = NodeId::random();
let mut rotator = LookupTargetRotator::default();
assert_eq!(rotator.next(&id), id);
assert_ne!(rotator.next(&id), id);
assert_ne!(rotator.next(&id), id);
assert_ne!(rotator.next(&id), id);
assert_eq!(rotator.next(&id), id);
}
#[tokio::test]
#[traced_test]
async fn test_pending_ping() {
@ -1294,4 +1334,17 @@ mod tests {
println!("total peers {}", table.len());
}
}
#[tokio::test(flavor = "multi_thread")]
#[traced_test]
async fn test_service_commands() {
let config = Discv4Config::builder().build();
let (discv4, mut service) = create_discv4_with_config(config).await;
service.lookup_self();
let _handle = service.spawn();
discv4.send_lookup_self();
let _ = discv4.lookup_self().await;
}
}

View File

@ -4,6 +4,7 @@ use generic_array::GenericArray;
use reth_primitives::keccak256;
use reth_rlp::{Decodable, DecodeError, Encodable};
use reth_rlp_derive::RlpEncodable;
use secp256k1::{SecretKey, SECP256K1};
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
str::FromStr,
@ -48,6 +49,13 @@ pub struct NodeRecord {
}
impl NodeRecord {
/// Derive the [`NodeRecord`] from the secret key and addr
pub fn from_secret_key(addr: SocketAddr, sk: &SecretKey) -> Self {
let pk = secp256k1::PublicKey::from_secret_key(SECP256K1, sk);
let id = NodeId::from_slice(&pk.serialize_uncompressed()[1..]);
Self::new(addr, id)
}
/// Creates a new record
#[allow(unused)]
pub(crate) fn new(addr: SocketAddr, id: NodeId) -> Self {

View File

@ -478,7 +478,7 @@ mod tests {
let (secret_key, _) = SECP256K1.generate_keypair(&mut rng);
let (encoded, _) = msg.encode(&secret_key);
assert!(encoded.len() <= MAX_PACKET_SIZE, "{} {:?}", encoded.len(), msg);
assert!(encoded.len() <= MAX_PACKET_SIZE, "{} {msg:?}", encoded.len());
let mut neighbours = Neighbours {
nodes: std::iter::repeat_with(|| rng_ipv6_record(&mut rng))
@ -489,7 +489,7 @@ mod tests {
neighbours.nodes.push(rng_ipv4_record(&mut rng));
let msg = Message::Neighbours(neighbours);
let (encoded, _) = msg.encode(&secret_key);
assert!(encoded.len() <= MAX_PACKET_SIZE, "{} {:?}", encoded.len(), msg);
assert!(encoded.len() <= MAX_PACKET_SIZE, "{} {msg:?}", encoded.len());
}
}

View File

@ -29,6 +29,11 @@ impl<S> UnauthedEthStream<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}
/// Consumes the type and returns the wrapped stream
pub fn into_inner(self) -> S {
self.inner
}
}
impl<S, E> UnauthedEthStream<S>

View File

@ -19,4 +19,4 @@ mod pinger;
pub mod types;
pub use types::*;
pub use ethstream::EthStream;
pub use ethstream::{EthStream, UnauthedEthStream};

View File

@ -63,7 +63,7 @@ impl Decodable for BlockHashOrNumber {
/// traversing towards the latest block.
///
/// If the [`skip`](#structfield.skip) field is non-zero, the peer must skip that amount of headers
/// in the the direction specified by [`reverse`](#structfield.reverse).
/// in the direction specified by [`reverse`](#structfield.reverse).
#[derive(Copy, Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)]
pub struct GetBlockHeaders {
/// The block number or hash that the peer should start returning headers from.

View File

@ -0,0 +1,47 @@
[package]
name = "reth-network"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/foundry-rs/reth"
readme = "README.md"
description = """
Ethereum network support
"""
[dependencies]
# reth
reth-interfaces = { path = "../../interfaces" }
reth-primitives = { path = "../../primitives" }
reth-discv4 = { path = "../discv4" }
reth-eth-wire = { path = "../eth-wire" }
reth-ecies = { path = "../ecies" }
reth-rlp = { path = "../../common/rlp", features = ["smol_str"] }
reth-rlp-derive = { path = "../../common/rlp-derive" }
reth-transaction-pool = { path = "../../transaction-pool" }
# async/futures
futures = "0.3"
pin-project = "1.0"
tokio = { version = "1", features = ["io-util", "net", "macros", "rt-multi-thread", "time"] }
tokio-stream = "0.1"
# misc
aquamarine = "0.1" # docs
tracing = "0.1"
fnv = "1.0"
thiserror = "1.0"
parking_lot = "0.12"
async-trait = "0.1"
bytes = "1.2"
smol_str = { version = "0.1", default-features = false }
either = "1.8"
secp256k1 = { version = "0.24", features = [
"global-context",
"rand-std",
"recovery",
] }
[dev-dependencies]
rand = "0.8"

View File

@ -0,0 +1,129 @@
use crate::{peers::PeersConfig, session::SessionsConfig};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_PORT};
use reth_eth_wire::forkid::ForkId;
use reth_primitives::Chain;
use secp256k1::SecretKey;
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
};
/// All network related initialization settings.
pub struct NetworkConfig<C> {
/// The client type that can interact with the chain.
pub client: Arc<C>,
/// The node's secret key, from which the node's identity is derived.
pub secret_key: SecretKey,
/// How to set up discovery.
pub discovery_v4_config: Discv4Config,
/// Address to use for discovery
pub discovery_addr: SocketAddr,
/// Address to listen for incoming connections
pub listener_addr: SocketAddr,
/// How to instantiate peer manager.
pub peers_config: PeersConfig,
/// How to configure the [`SessionManager`]
pub sessions_config: SessionsConfig,
/// A fork identifier as defined by EIP-2124.
/// Serves as the chain compatibility identifier.
pub fork_id: Option<ForkId>,
/// The id of the network
pub chain: Chain,
}
// === impl NetworkConfig ===
impl<C> NetworkConfig<C> {
/// Create a new instance with all mandatory fields set, rest is field with defaults.
pub fn new(client: Arc<C>, secret_key: SecretKey) -> Self {
Self::builder(client, secret_key).build()
}
/// Convenience method for creating the corresponding builder type
pub fn builder(client: Arc<C>, secret_key: SecretKey) -> NetworkConfigBuilder<C> {
NetworkConfigBuilder::new(client, secret_key)
}
/// Sets the config to use for the discovery v4 protocol.
pub fn set_discovery_v4(mut self, discovery_config: Discv4Config) -> Self {
self.discovery_v4_config = discovery_config;
self
}
/// Sets the address for the incoming connection listener.
pub fn set_listener_addr(mut self, listener_addr: SocketAddr) -> Self {
self.listener_addr = listener_addr;
self
}
}
/// Builder for [`NetworkConfig`](struct.NetworkConfig.html).
pub struct NetworkConfigBuilder<C> {
/// The client type that can interact with the chain.
client: Arc<C>,
/// The node's secret key, from which the node's identity is derived.
secret_key: SecretKey,
/// How to set up discovery.
discovery_v4_builder: Discv4ConfigBuilder,
/// Address to use for discovery
discovery_addr: Option<SocketAddr>,
/// Listener for incoming connections
listener_addr: Option<SocketAddr>,
/// How to instantiate peer manager.
peers_config: Option<PeersConfig>,
/// How to configure the sessions manager
sessions_config: Option<SessionsConfig>,
fork_id: Option<ForkId>,
chain: Chain,
}
// === impl NetworkConfigBuilder ===
#[allow(missing_docs)]
impl<C> NetworkConfigBuilder<C> {
pub fn new(client: Arc<C>, secret_key: SecretKey) -> Self {
Self {
client,
secret_key,
discovery_v4_builder: Default::default(),
discovery_addr: None,
listener_addr: None,
peers_config: None,
sessions_config: None,
fork_id: None,
chain: Chain::Named(reth_primitives::rpc::Chain::Mainnet),
}
}
/// Consumes the type and creates the actual [`NetworkConfig`]
pub fn build(self) -> NetworkConfig<C> {
let Self {
client,
secret_key,
discovery_v4_builder,
discovery_addr,
listener_addr,
peers_config,
sessions_config,
fork_id,
chain,
} = self;
NetworkConfig {
client,
secret_key,
discovery_v4_config: discovery_v4_builder.build(),
discovery_addr: discovery_addr.unwrap_or_else(|| {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT))
}),
listener_addr: listener_addr.unwrap_or_else(|| {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT))
}),
peers_config: peers_config.unwrap_or_default(),
sessions_config: sessions_config.unwrap_or_default(),
fork_id,
chain,
}
}
}

View File

@ -0,0 +1,152 @@
//! Discovery support for the network.
use crate::{error::NetworkError, NodeId};
use futures::StreamExt;
use reth_discv4::{Discv4, Discv4Config, NodeRecord, TableUpdate};
use secp256k1::SecretKey;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
net::SocketAddr,
task::{Context, Poll},
};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
/// An abstraction over the configured discovery protocol.
///
/// Listens for new discovered nodes and emits events for discovered nodes and their address.
pub struct Discovery {
/// All nodes discovered via discovery protocol.
///
/// These nodes can be ephemeral and are updated via the discovery protocol.
discovered_nodes: HashMap<NodeId, SocketAddr>,
/// Local ENR of the discovery service.
local_enr: NodeRecord,
/// Handler to interact with the Discovery v4 service
discv4: Discv4,
/// All updates from the discv4 service.
discv4_updates: ReceiverStream<TableUpdate>,
/// The initial config for the discv4 service
dsicv4_config: Discv4Config,
/// Buffered events until polled.
queued_events: VecDeque<DiscoveryEvent>,
/// The handle to the spawned discv4 service
_discv4_service: JoinHandle<()>,
}
impl Discovery {
/// Spawns the discovery service.
///
/// This will spawn the [`reth_discv4::Discv4Service`] onto a new task and establish a listener
/// channel to receive all discovered nodes.
pub async fn new(
discovery_addr: SocketAddr,
sk: SecretKey,
dsicv4_config: Discv4Config,
) -> Result<Self, NetworkError> {
let local_enr = NodeRecord::from_secret_key(discovery_addr, &sk);
let (discv4, mut discv4_service) =
Discv4::bind(discovery_addr, local_enr, sk, dsicv4_config.clone())
.await
.map_err(NetworkError::Discovery)?;
let discv4_updates = discv4_service.update_stream();
// spawn the service
let _discv4_service = discv4_service.spawn();
Ok(Self {
local_enr,
discv4,
discv4_updates,
dsicv4_config,
_discv4_service,
discovered_nodes: Default::default(),
queued_events: Default::default(),
})
}
/// Returns the id with which the local identifies itself in the network
pub(crate) fn local_id(&self) -> NodeId {
self.local_enr.id
}
/// Manually adds an address to the set.
pub(crate) fn add_known_address(&mut self, node_id: NodeId, addr: SocketAddr) {
self.on_discv4_update(TableUpdate::Added(NodeRecord {
address: addr.ip(),
tcp_port: addr.port(),
udp_port: addr.port(),
id: node_id,
}))
}
/// Returns all nodes we know exist in the network.
pub fn known_nodes(&mut self) -> &HashMap<NodeId, SocketAddr> {
&self.discovered_nodes
}
fn on_discv4_update(&mut self, update: TableUpdate) {
match update {
TableUpdate::Added(node) => {
let id = node.id;
let addr = node.tcp_addr();
match self.discovered_nodes.entry(id) {
Entry::Occupied(_entry) => {}
Entry::Vacant(entry) => {
entry.insert(addr);
self.queued_events.push_back(DiscoveryEvent::Discovered(id, addr))
}
}
}
TableUpdate::Removed(node) => {
self.discovered_nodes.remove(&node);
}
TableUpdate::Batch(updates) => {
for update in updates {
self.on_discv4_update(update);
}
}
}
}
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DiscoveryEvent> {
loop {
// Drain all buffered events first
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event)
}
while let Poll::Ready(Some(update)) = self.discv4_updates.poll_next_unpin(cx) {
self.on_discv4_update(update)
}
if self.queued_events.is_empty() {
return Poll::Pending
}
}
// drain the update stream
}
}
/// Events produced by the [`Discovery`] manager.
pub enum DiscoveryEvent {
/// A new node was discovered
Discovered(NodeId, SocketAddr),
}
#[cfg(test)]
mod tests {
use super::*;
use rand::thread_rng;
use secp256k1::SECP256K1;
use std::net::{Ipv4Addr, SocketAddrV4};
#[tokio::test(flavor = "multi_thread")]
async fn test_discovery_setup() {
let mut rng = thread_rng();
let (secret_key, _) = SECP256K1.generate_keypair(&mut rng);
let discovery_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
let _discovery =
Discovery::new(discovery_addr, secret_key, Default::default()).await.unwrap();
}
}

View File

@ -0,0 +1,12 @@
//! Possible errors when interacting with the network.
/// All error variants for the network
#[derive(Debug, thiserror::Error)]
pub enum NetworkError {
/// General IO error.
#[error(transparent)]
Io(#[from] std::io::Error),
/// IO error when creating the discovery service
#[error("Failed to launch discovery service: {0}")]
Discovery(std::io::Error),
}

View File

@ -0,0 +1,186 @@
//! Fetch data from the network.
use crate::{message::RequestResult, NodeId};
use futures::StreamExt;
use reth_eth_wire::{BlockBody, EthMessage};
use reth_interfaces::p2p::headers::client::HeadersRequest;
use reth_primitives::{Header, H256, U256};
use std::{
collections::{HashMap, VecDeque},
task::{Context, Poll},
time::Instant,
};
use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// Manages data fetching operations.
///
/// This type is hooked into the staged sync pipeline and delegates download request to available
/// peers and sends the response once ready.
pub struct StateFetcher {
/// Currently active [`GetBlockHeaders`] requests
inflight_headers_requests: HashMap<NodeId, Request<HeadersRequest, RequestResult<Vec<Header>>>>,
/// The list of available peers for requests.
peers: HashMap<NodeId, Peer>,
/// Requests queued for processing
queued_requests: VecDeque<DownloadRequest>,
/// Receiver for new incoming download requests
download_requests_rx: UnboundedReceiverStream<DownloadRequest>,
/// Sender for download requests, used to detach a [`HeadersDownloader`]
download_requests_tx: UnboundedSender<DownloadRequest>,
}
// === impl StateSyncer ===
impl StateFetcher {
/// Invoked when connected to a new peer.
pub(crate) fn new_connected_peer(
&mut self,
_node_id: NodeId,
_best_hash: H256,
_best_number: U256,
) {
}
/// Returns the next action to return
fn poll_action(&mut self) -> Option<FetchAction> {
// TODO find matching peers
// if let Some(request) = self.queued_requests.pop_front() {
// if let Some(action) = self.on_download_request(request) {
// return Poll::Ready(action)
// }
// }
None
}
fn on_download_request(&mut self, request: DownloadRequest) -> Option<FetchAction> {
match request {
DownloadRequest::GetBlockHeaders { request: _, response: _ } => {}
DownloadRequest::GetBlockBodies { .. } => {}
}
None
}
/// Advance the state the syncer
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
// drain buffered actions first
if let Some(action) = self.poll_action() {
return Poll::Ready(action)
}
loop {
// poll incoming requests
match self.download_requests_rx.poll_next_unpin(cx) {
Poll::Ready(Some(request)) => {
if let Some(action) = self.on_download_request(request) {
return Poll::Ready(action)
}
}
Poll::Ready(None) => {
unreachable!("channel can't close")
}
Poll::Pending => break,
}
}
if self.queued_requests.is_empty() {
return Poll::Pending
}
Poll::Pending
}
/// Called on a `GetBlockHeaders` response from a peer
pub(crate) fn on_block_headers_response(
&mut self,
_from: NodeId,
_msg: RequestResult<Vec<Header>>,
) {
}
/// Returns a new [`HeadersDownloader`] that can send requests to this type
pub(crate) fn headers_downloader(&self) -> HeadersDownloader {
HeadersDownloader { request_tx: self.download_requests_tx.clone() }
}
}
impl Default for StateFetcher {
fn default() -> Self {
let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
Self {
inflight_headers_requests: Default::default(),
peers: Default::default(),
queued_requests: Default::default(),
download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
download_requests_tx,
}
}
}
/// Front-end API for downloading headers.
#[derive(Debug)]
pub struct HeadersDownloader {
/// Sender half of the request channel.
request_tx: UnboundedSender<DownloadRequest>,
}
// === impl HeadersDownloader ===
impl HeadersDownloader {
/// Sends a `GetBlockHeaders` request to an available peer.
pub async fn get_block_headers(&self, request: HeadersRequest) -> RequestResult<Vec<Header>> {
let (response, rx) = oneshot::channel();
self.request_tx.send(DownloadRequest::GetBlockHeaders { request, response })?;
rx.await?
}
}
/// Represents a connected peer
struct Peer {
/// Identifier for requests.
request_id: u64,
/// The state this peer currently resides in.
state: PeerState,
/// Best known hash that the peer has
best_hash: H256,
/// Best known number the peer has.
best_number: U256,
}
/// Tracks the state of an individual peer
enum PeerState {
/// Peer is currently not handling requests and is available.
Idle,
/// Peer is handling a `GetBlockHeaders` request.
GetBlockHeaders,
}
/// A request that waits for a response from the network so it can send it back through the response
/// channel.
struct Request<Req, Resp> {
request: Req,
response: oneshot::Sender<Resp>,
started: Instant,
}
/// Requests that can be sent to the Syncer from a [`HeadersDownloader`]
enum DownloadRequest {
/// Download the requested headers and send response through channel
GetBlockHeaders {
request: HeadersRequest,
response: oneshot::Sender<RequestResult<Vec<Header>>>,
},
/// Download the requested headers and send response through channel
GetBlockBodies { request: Vec<H256>, response: oneshot::Sender<RequestResult<Vec<BlockBody>>> },
}
/// An action the syncer can emit.
pub(crate) enum FetchAction {
/// Dispatch an eth request to the given peer.
EthRequest {
node_id: NodeId,
/// The request to send
request: EthMessage,
},
}

View File

@ -0,0 +1,38 @@
#![warn(missing_docs)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
// TODO remove later
#![allow(dead_code)]
//! reth P2P networking.
//!
//! Ethereum's networking protocol is specified in [devp2p](https://github.com/ethereum/devp2p).
//!
//! In order for a node to join the ethereum p2p network it needs to know what nodes are already
//! port of that network. This includes public identities (public key) and addresses (where to reach
//! them).
mod config;
mod discovery;
pub mod error;
mod fetch;
mod listener;
mod manager;
mod message;
mod network;
mod peers;
mod session;
mod state;
mod swarm;
mod transactions;
/// Identifier for a unique node
pub type NodeId = reth_discv4::NodeId;
pub use config::NetworkConfig;
pub use manager::NetworkManager;
pub use network::NetworkHandle;
pub use peers::PeersConfig;

View File

@ -0,0 +1,128 @@
//! Contains connection-oriented interfaces.
use futures::{ready, Stream};
use std::{
io,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use tokio::net::{TcpListener, TcpStream};
/// A tcp connection listener.
///
/// Listens for incoming connections.
#[must_use = "Transport does nothing unless polled."]
#[pin_project::pin_project]
#[derive(Debug)]
pub struct ConnectionListener {
/// Local address of the listener stream.
local_address: SocketAddr,
/// The active tcp listener for incoming connections.
#[pin]
incoming: TcpListenerStream,
}
impl ConnectionListener {
/// Creates a new [`TcpListener`] that listens for incoming connections.
pub async fn bind(addr: SocketAddr) -> io::Result<Self> {
let listener = TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
Ok(Self::new(listener, local_addr))
}
/// Creates a new connection listener stream.
pub(crate) fn new(listener: TcpListener, local_address: SocketAddr) -> Self {
Self { local_address, incoming: TcpListenerStream { inner: listener } }
}
/// Polls the type to make progress.
pub fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ListenerEvent> {
let this = self.project();
match ready!(this.incoming.poll_next(cx)) {
Some(Ok((stream, remote_addr))) => {
Poll::Ready(ListenerEvent::Incoming { stream, remote_addr })
}
Some(Err(err)) => Poll::Ready(ListenerEvent::Error(err)),
None => {
Poll::Ready(ListenerEvent::ListenerClosed { local_address: *this.local_address })
}
}
}
/// Returns the socket address this listener listens on.
pub fn local_address(&self) -> SocketAddr {
self.local_address
}
}
/// Event type produced by the [`Transport`].
pub enum ListenerEvent {
/// Received a new incoming.
Incoming {
/// Accepted connection
stream: TcpStream,
/// Address of the remote peer.
remote_addr: SocketAddr,
},
/// Returned when the underlying connection listener has been closed.
///
/// This is the case if the [`TcpListenerStream`] should ever return `None`
ListenerClosed {
/// Address of the closed listener.
local_address: SocketAddr,
},
/// Encountered an error when accepting a connection.
///
/// This is non-fatal error as the listener continues to listen for new connections to accept.
Error(io::Error),
}
/// A stream of incoming [`TcpStream`]s.
#[derive(Debug)]
struct TcpListenerStream {
/// listener for incoming connections.
inner: TcpListener,
}
impl Stream for TcpListenerStream {
type Item = io::Result<(TcpStream, SocketAddr)>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.inner.poll_accept(cx) {
Poll::Ready(Ok(conn)) => Poll::Ready(Some(Ok(conn))),
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::pin_mut;
use std::net::{Ipv4Addr, SocketAddrV4};
use tokio::macros::support::poll_fn;
#[tokio::test(flavor = "multi_thread")]
async fn test_incoming_listener() {
let listener =
ConnectionListener::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
.await
.unwrap();
let local_addr = listener.local_address();
tokio::task::spawn(async move {
pin_mut!(listener);
match poll_fn(|cx| listener.as_mut().poll(cx)).await {
ListenerEvent::Incoming { .. } => {}
_ => {
panic!("unexpected event")
}
}
});
let _ = TcpStream::connect(local_addr).await.unwrap();
}
}

View File

@ -0,0 +1,324 @@
//! High level network management.
//!
//! The [`Network`] contains the state of the network as a whole. It controls how connections are
//! handled and keeps track of connections to peers.
//!
//! ## Capabilities
//!
//! The network manages peers depending on their announced capabilities via their RLPx sessions. Most importantly the [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md)(`eth`).
//!
//! ## Overview
//!
//! The [`NetworkManager`] is responsible for advancing the state of the `network`. The `network` is
//! made up of peer-to-peer connections between nodes that are available on the same network.
//! Responsible for peer discovery is ethereum's discovery protocol (discv4, discv5). If the address
//! (IP+port) of our node is published via discovery, remote peers can initiate inbound connections
//! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the RLPx session.
use crate::{
config::NetworkConfig,
discovery::Discovery,
error::NetworkError,
listener::ConnectionListener,
message::{Capabilities, CapabilityMessage},
network::{NetworkHandle, NetworkHandleMessage},
peers::PeersManager,
session::SessionManager,
state::NetworkState,
swarm::{Swarm, SwarmEvent},
NodeId,
};
use futures::{Future, StreamExt};
use parking_lot::Mutex;
use reth_eth_wire::EthMessage;
use reth_interfaces::provider::BlockProvider;
use std::{
net::SocketAddr,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{error, trace};
/// Manages the _entire_ state of the network.
///
/// This is an endless [`Future`] that consistently drives the state of the entire network forward.
///
/// The [`NetworkManager`] is the container type for all parts involved with advancing the network.
#[cfg_attr(doc, aquamarine::aquamarine)]
/// ```mermaid
/// graph TB
/// handle(NetworkHandle)
/// events(NetworkEvents)
/// subgraph NetworkManager
/// direction LR
/// subgraph Swarm
/// direction TB
/// B1[(Peer Sessions)]
/// B2[(Connection Lister)]
/// B3[(State)]
/// end
/// end
/// handle <--> |request/response channel| NetworkManager
/// NetworkManager --> |Network events| events
/// ```
#[must_use = "The NetworkManager does nothing unless polled"]
pub struct NetworkManager<C> {
/// The type that manages the actual network part, which includes connections.
swarm: Swarm<C>,
/// Underlying network handle that can be shared.
handle: NetworkHandle,
/// Receiver half of the command channel set up between this type and the [`NetworkHandle`]
from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage>,
/// Handles block imports.
block_import_sink: (),
/// The address of this node that listens for incoming connections.
listener_address: Arc<Mutex<SocketAddr>>,
/// All listeners for [`Network`] events.
event_listeners: NetworkEventListeners,
/// Tracks the number of active session (connected peers).
///
/// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
num_active_peers: Arc<AtomicUsize>,
/// Local copy of the `NodeId` of the local node.
local_node_id: NodeId,
}
// === impl NetworkManager ===
impl<C> NetworkManager<C>
where
C: BlockProvider,
{
/// Creates the manager of a new network.
///
/// The [`NetworkManager`] is an endless future that needs to be polled in order to advance the
/// state of the entire network.
pub async fn new(config: NetworkConfig<C>) -> Result<Self, NetworkError> {
let NetworkConfig {
client,
secret_key,
discovery_v4_config,
discovery_addr,
listener_addr,
peers_config,
sessions_config,
..
} = config;
let peers_manger = PeersManager::new(peers_config);
let peers_handle = peers_manger.handle();
let incoming = ConnectionListener::bind(listener_addr).await?;
let listener_address = Arc::new(Mutex::new(incoming.local_address()));
let discovery = Discovery::new(discovery_addr, secret_key, discovery_v4_config).await?;
// need to retrieve the addr here since provided port could be `0`
let local_node_id = discovery.local_id();
// TODO this should also need sk for encrypted sessions
let sessions = SessionManager::new(secret_key, sessions_config);
let state = NetworkState::new(client, discovery, peers_manger);
let swarm = Swarm::new(incoming, sessions, state);
let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
let num_active_peers = Arc::new(AtomicUsize::new(0));
let handle = NetworkHandle::new(
Arc::clone(&num_active_peers),
Arc::clone(&listener_address),
to_manager_tx,
local_node_id,
peers_handle,
);
Ok(Self {
swarm,
handle,
from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
block_import_sink: (),
listener_address,
event_listeners: Default::default(),
num_active_peers,
local_node_id,
})
}
/// Returns the [`NetworkHandle`] that can be cloned and shared.
///
/// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
pub fn handle(&self) -> &NetworkHandle {
&self.handle
}
/// Event hook for an unexpected message from the peer.
fn on_invalid_message(
&self,
_node_id: NodeId,
_capabilities: Arc<Capabilities>,
_message: CapabilityMessage,
) {
// TODO: disconnect?
}
/// Handles a received [`CapabilityMessage`] from the peer.
fn on_capability_message(&mut self, _node_id: NodeId, msg: CapabilityMessage) {
match msg {
CapabilityMessage::Eth(eth) => {
match eth {
EthMessage::Status(_) => {}
EthMessage::NewBlockHashes(_) => {
// update peer's state, to track what blocks this peer has seen
}
EthMessage::NewBlock(_) => {
// emit new block and track that the peer knows this block
}
EthMessage::Transactions(_) => {
// need to emit this as event/send to tx handler
}
EthMessage::NewPooledTransactionHashes(_) => {
// need to emit this as event/send to tx handler
}
// TODO: should remove the response types here, as they are handled separately
EthMessage::GetBlockHeaders(_) => {}
EthMessage::BlockHeaders(_) => {}
EthMessage::GetBlockBodies(_) => {}
EthMessage::BlockBodies(_) => {}
EthMessage::GetPooledTransactions(_) => {}
EthMessage::PooledTransactions(_) => {}
EthMessage::GetNodeData(_) => {}
EthMessage::NodeData(_) => {}
EthMessage::GetReceipts(_) => {}
EthMessage::Receipts(_) => {}
}
}
CapabilityMessage::Other(_) => {
// other subprotocols
}
}
}
/// Handler for received messages from a handle
fn on_handle_message(&mut self, msg: NetworkHandleMessage) {
match msg {
NetworkHandleMessage::EventListener(tx) => {
self.event_listeners.listeners.push(tx);
}
NetworkHandleMessage::NewestBlock(_, _) => {}
_ => {}
}
}
}
impl<C> Future for NetworkManager<C>
where
C: BlockProvider,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// process incoming messages from a handle
loop {
match this.from_handle_rx.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
// This is only possible if the channel was deliberately closed since we always
// have an instance of `NetworkHandle`
error!("network message channel closed.");
return Poll::Ready(())
}
Poll::Ready(Some(msg)) => this.on_handle_message(msg),
};
}
// advance the swarm
while let Poll::Ready(Some(event)) = this.swarm.poll_next_unpin(cx) {
// handle event
match event {
SwarmEvent::CapabilityMessage { node_id, message } => {
this.on_capability_message(node_id, message)
}
SwarmEvent::InvalidCapabilityMessage { node_id, capabilities, message } => {
this.on_invalid_message(node_id, capabilities, message)
}
SwarmEvent::TcpListenerClosed { remote_addr } => {
trace!(?remote_addr, target = "net", "TCP listener closed.");
}
SwarmEvent::TcpListenerError(err) => {
trace!(?err, target = "net", "TCP connection error.");
}
SwarmEvent::IncomingTcpConnection { remote_addr, .. } => {
trace!(?remote_addr, target = "net", "Incoming connection");
}
SwarmEvent::OutgoingTcpConnection { remote_addr } => {
trace!(?remote_addr, target = "net", "Starting outbound connection.");
}
SwarmEvent::SessionEstablished { node_id, remote_addr } => {
let total_active = this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
trace!(
?remote_addr,
?node_id,
?total_active,
target = "net",
"Session established"
);
}
SwarmEvent::SessionClosed { node_id, remote_addr } => {
let total_active = this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
trace!(
?remote_addr,
?node_id,
?total_active,
target = "net",
"Session disconnected"
);
}
SwarmEvent::IncomingPendingSessionClosed { .. } => {}
SwarmEvent::OutgoingPendingSessionClosed { .. } => {}
SwarmEvent::OutgoingConnectionError { .. } => {}
}
}
todo!()
}
}
/// Events emitted by the network that are of interest for subscribers.
#[derive(Debug, Clone)]
pub enum NetworkEvent {
EthMessage { node_id: NodeId, message: EthMessage },
}
/// Bundles all listeners for [`NetworkEvent`]s.
#[derive(Default)]
struct NetworkEventListeners {
/// All listeners for an event
listeners: Vec<mpsc::UnboundedSender<NetworkEvent>>,
}
// === impl NetworkEventListeners ===
impl NetworkEventListeners {
/// Sends the event to all listeners.
///
/// Remove channels that got closed.
fn send(&mut self, event: NetworkEvent) {
self.listeners.retain(|listener| {
let open = listener.send(event.clone()).is_ok();
if !open {
trace!(target = "net", "event listener channel closed",);
}
open
});
}
}

View File

@ -0,0 +1,143 @@
//! Capability messaging
//!
//! An RLPx stream is multiplexed via the prepended message-id of a framed message.
//! Capabilities are exchanged via the RLPx `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
use bytes::{BufMut, Bytes};
use reth_eth_wire::{BlockHeaders, EthMessage, GetBlockHeaders};
use reth_rlp::{Decodable, DecodeError, Encodable};
use reth_rlp_derive::{RlpDecodable, RlpEncodable};
use smol_str::SmolStr;
use tokio::sync::{mpsc, oneshot};
/// Result alias for result of a request.
pub type RequestResult<T> = Result<T, RequestError>;
/// Error variants that can happen when sending requests to a session.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum RequestError {
#[error("Closed channel.")]
ChannelClosed,
#[error("Not connected to the node.")]
NotConnected,
#[error("Capability Message is not supported by remote peer.")]
UnsupportedCapability,
#[error("Network error: {0}")]
Io(String),
}
impl<T> From<mpsc::error::SendError<T>> for RequestError {
fn from(_: mpsc::error::SendError<T>) -> Self {
RequestError::ChannelClosed
}
}
impl From<oneshot::error::RecvError> for RequestError {
fn from(_: oneshot::error::RecvError) -> Self {
RequestError::ChannelClosed
}
}
/// Represents all capabilities of a node.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Capabilities {
/// All Capabilities and their versions
inner: Vec<Capability>,
eth_66: bool,
eth_67: bool,
}
impl Capabilities {
/// Whether this peer supports eth v66 protocol.
#[inline]
pub fn supports_eth_v66(&self) -> bool {
self.eth_66
}
/// Whether this peer supports eth v67 protocol.
#[inline]
pub fn supports_eth_v67(&self) -> bool {
self.eth_67
}
}
impl Encodable for Capabilities {
fn encode(&self, out: &mut dyn BufMut) {
self.inner.encode(out)
}
}
impl Decodable for Capabilities {
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError> {
let inner = Vec::<Capability>::decode(buf)?;
Ok(Self {
eth_66: inner.iter().any(Capability::is_eth_v66),
eth_67: inner.iter().any(Capability::is_eth_v67),
inner,
})
}
}
/// Represents an announced Capability in the `Hello` message
#[derive(Debug, Clone, Eq, PartialEq, RlpDecodable, RlpEncodable)]
pub struct Capability {
/// Name of the Capability
pub name: SmolStr,
/// The version of the capability
pub version: u64,
}
// === impl Capability ===
impl Capability {
/// Whether this is eth v66 protocol.
#[inline]
pub fn is_eth_v66(&self) -> bool {
self.name == "eth" && self.version == 66
}
/// Whether this is eth v67.
#[inline]
pub fn is_eth_v67(&self) -> bool {
self.name == "eth" && self.version == 67
}
}
/// A Capability message consisting of the message-id and the payload
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RawCapabilityMessage {
/// Identifier of the message.
pub id: usize,
/// Actual payload
pub payload: Bytes,
}
/// Various protocol related event types bubbled up from a session that need to be handled by the
/// network.
#[derive(Debug)]
pub enum CapabilityMessage {
/// Eth sub-protocol message.
Eth(EthMessage),
/// Any other capability message.
Other(RawCapabilityMessage),
}
/// Protocol related request messages that expect a response
#[derive(Debug)]
pub enum CapabilityRequest {
/// Request Block headers from the peer.
///
/// The response should be sent through the channel.
GetBlockHeaders {
request: GetBlockHeaders,
response: oneshot::Sender<RequestResult<BlockHeaders>>,
},
}
/// The actual response object
#[derive(Debug)]
pub enum CapabilityResponse {
GetBlockHeaders(RequestResult<BlockHeaders>),
}

View File

@ -0,0 +1,73 @@
use crate::{manager::NetworkEvent, peers::PeersHandle, NodeId};
use parking_lot::Mutex;
use reth_primitives::{H256, U256};
use std::{
net::SocketAddr,
sync::{atomic::AtomicUsize, Arc},
};
use tokio::sync::{mpsc, mpsc::UnboundedSender};
/// A _shareable_ network frontend. Used to interact with the network.
///
/// See also [`NetworkManager`](crate::NetworkManager).
#[derive(Clone)]
pub struct NetworkHandle {
/// The Arc'ed delegate that contains the state.
inner: Arc<NetworkInner>,
}
// === impl NetworkHandle ===
impl NetworkHandle {
/// Creates a single new instance.
pub(crate) fn new(
num_active_peers: Arc<AtomicUsize>,
listener_address: Arc<Mutex<SocketAddr>>,
to_manager_tx: UnboundedSender<NetworkHandleMessage>,
local_node_id: NodeId,
peers: PeersHandle,
) -> Self {
let inner = NetworkInner {
num_active_peers,
to_manager_tx,
listener_address,
local_node_id,
peers,
};
Self { inner: Arc::new(inner) }
}
fn manager(&self) -> &UnboundedSender<NetworkHandleMessage> {
&self.inner.to_manager_tx
}
/// Creates a new [`NetworkEvent`] listener channel.
pub fn event_listener(&self) -> mpsc::UnboundedReceiver<NetworkEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let _ = self.manager().send(NetworkHandleMessage::EventListener(tx));
rx
}
}
struct NetworkInner {
/// Number of active peer sessions the node's currently handling.
num_active_peers: Arc<AtomicUsize>,
/// Sender half of the message channel to the [`NetworkManager`].
to_manager_tx: UnboundedSender<NetworkHandleMessage>,
/// The local address that accepts incoming connections.
listener_address: Arc<Mutex<SocketAddr>>,
/// The identifier used by this node.
local_node_id: NodeId,
/// Access to the all the nodes
peers: PeersHandle, // TODO need something to access
}
/// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager).
pub(crate) enum NetworkHandleMessage {
/// Add a new listener for [`NetworkEvent`].
EventListener(UnboundedSender<NetworkEvent>),
/// Broadcast event to announce a new block to all nodes.
AnnounceBlock,
/// Returns the newest imported block by the network.
NewestBlock(H256, U256),
}

View File

@ -0,0 +1,175 @@
use reth_discv4::NodeId;
use futures::StreamExt;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
net::SocketAddr,
task::{Context, Poll},
time::Duration,
};
use tokio::{
sync::mpsc,
time::{Instant, Interval},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A communication channel to the [`PeersManager`] to apply changes to the peer set.
pub struct PeersHandle {
manager_tx: mpsc::UnboundedSender<PeerCommand>,
}
/// Maintains the state of _all_ the peers known to the network.
///
/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`].
/// From this type, connections to peers are established or disconnected, see [`PeerAction`].
///
/// The [`PeersManager`] will be notified on peer related changes
pub(crate) struct PeersManager {
/// All peers known to the network
peers: HashMap<NodeId, Node>,
/// Copy of the receiver half, so new [`PeersHandle`] can be created on demand.
manager_tx: mpsc::UnboundedSender<PeerCommand>,
/// Receiver half of the command channel.
handle_rx: UnboundedReceiverStream<PeerCommand>,
/// Buffered actions until the manager is polled.
actions: VecDeque<PeerAction>,
/// Interval for triggering connections if there are free slots.
refill_slots_interval: Interval,
/// Tracks current slot stats.
connection_info: ConnectionInfo,
}
impl PeersManager {
/// Create a new instance with the given config
pub(crate) fn new(config: PeersConfig) -> Self {
let PeersConfig { refill_slots_interval, connection_info } = config;
let (manager_tx, handle_rx) = mpsc::unbounded_channel();
Self {
peers: Default::default(),
manager_tx,
handle_rx: UnboundedReceiverStream::new(handle_rx),
actions: Default::default(),
refill_slots_interval: tokio::time::interval_at(
Instant::now() + refill_slots_interval,
refill_slots_interval,
),
connection_info,
}
}
/// Returns a new [`PeersHandle`] that can send commands to this type
pub(crate) fn handle(&self) -> PeersHandle {
PeersHandle { manager_tx: self.manager_tx.clone() }
}
pub(crate) fn add_discovered_node(&mut self, node: NodeId, addr: SocketAddr) {
match self.peers.entry(node) {
Entry::Occupied(_) => {}
Entry::Vacant(entry) => {
entry.insert(Node::new(addr));
}
}
}
pub(crate) fn remove_discovered_node(&mut self, _node: NodeId) {}
/// If there's capacity for new outbound connections, this will queue new
/// [`PeerAction::Connect`] actions.
fn fill_outbound_slots(&mut self) {
// This checks if there are free slots for new outbound connections available that can be
// filled
}
/// Advances the state.
///
/// Event hooks invoked externally may trigger a new [`PeerAction`] that are buffered until
/// [`PeersManager::poll_next`] is called.
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
loop {
// drain buffered actions
if let Some(action) = self.actions.pop_front() {
return Poll::Ready(action)
}
if self.refill_slots_interval.poll_tick(cx).is_ready() {
self.fill_outbound_slots();
}
while let Poll::Ready(Some(_cmd)) = self.handle_rx.poll_next_unpin(cx) {
// TODO handle incoming command
}
}
}
}
/// Tracks stats about connected nodes
#[derive(Debug)]
pub struct ConnectionInfo {
/// Currently occupied slots for outbound connections.
num_outbound: usize,
/// Currently occupied slots for inbound connections.
num_inbound: usize,
/// Maximum allowed outbound connections.
max_outbound: usize,
/// Maximum allowed inbound connections.
max_inbound: usize,
}
/// Tracks info about a single node.
struct Node {
/// Where to reach the node
addr: SocketAddr,
/// Reputation of the node.
reputation: i32,
}
// === impl Node ===
impl Node {
fn new(addr: SocketAddr) -> Self {
Self { addr, reputation: 0 }
}
}
/// Commands the [`PeersManager`] listens for.
pub enum PeerCommand {
Add(NodeId),
Remove(NodeId),
// TODO reputation change
}
/// Actions the peer manager can trigger.
#[derive(Debug)]
pub enum PeerAction {
/// Start a new connection to a peer.
Connect {
/// The peer to connect to.
node_id: NodeId,
/// Where to reach the node
remote_addr: SocketAddr,
},
/// Disconnect an existing connection.
Disconnect { node_id: NodeId },
}
/// Config type for initiating a [`PeersManager`] instance
#[derive(Debug)]
pub struct PeersConfig {
/// How even to recheck free slots for outbound connections
pub refill_slots_interval: Duration,
/// Restrictions on connections
pub connection_info: ConnectionInfo,
}
impl Default for PeersConfig {
fn default() -> Self {
Self {
refill_slots_interval: Duration::from_millis(1_000),
connection_info: ConnectionInfo {
num_outbound: 0,
num_inbound: 0,
max_outbound: 70,
max_inbound: 30,
},
}
}
}

View File

@ -0,0 +1,125 @@
//! Session handles
use crate::{
message::{Capabilities, CapabilityMessage},
session::{Direction, SessionId},
NodeId,
};
use reth_ecies::{stream::ECIESStream, ECIESError};
use std::{io, net::SocketAddr, sync::Arc, time::Instant};
use tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
};
/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello
/// message which exchanges the `capabilities` of the peer.
///
/// This session needs to wait until it is authenticated.
#[derive(Debug)]
pub(crate) struct PendingSessionHandle {
/// Can be used to tell the session to disconnect the connection/abort the handshake process.
pub(crate) disconnect_tx: oneshot::Sender<()>,
}
/// An established session with a remote peer.
///
/// Within an active session that supports the `Ethereum Wire Protocol `, three high-level tasks can
/// be performed: chain synchronization, block propagation and transaction exchange.
#[derive(Debug)]
pub(crate) struct ActiveSessionHandle {
/// The assigned id for this session
pub(crate) session_id: SessionId,
/// The identifier of the remote peer
pub(crate) remote_id: NodeId,
/// The timestamp when the session has been established.
pub(crate) established: Instant,
/// Announced capabilities of the peer.
pub(crate) capabilities: Arc<Capabilities>,
/// Sender half of the command channel used send commands _to_ the spawned session
pub(crate) commands: mpsc::Sender<SessionCommand>,
}
// === impl ActiveSessionHandle ===
impl ActiveSessionHandle {
/// Sends a disconnect command to the session.
pub(crate) fn disconnect(&self) {
// Note: we clone the sender which ensures the channel has capacity to send the message
let _ = self.commands.clone().try_send(SessionCommand::Disconnect);
}
}
/// Events a pending session can produce.
///
/// This represents the state changes a session can undergo until it is ready to send capability messages <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md>.
///
/// A session starts with a `Handshake`, followed by a `Hello` message which
#[derive(Debug)]
pub(crate) enum PendingSessionEvent {
/// Initial handshake step was successful <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#initial-handshake>
SuccessfulHandshake { remote_addr: SocketAddr, session_id: SessionId },
/// Represents a successful `Hello` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
Hello {
session_id: SessionId,
node_id: NodeId,
capabilities: Arc<Capabilities>,
stream: ECIESStream<TcpStream>,
},
/// Handshake unsuccessful, session was disconnected.
Disconnected {
remote_addr: SocketAddr,
session_id: SessionId,
direction: Direction,
error: Option<ECIESError>,
},
/// Thrown when unable to establish a [`TcpStream`].
OutgoingConnectionError {
remote_addr: SocketAddr,
session_id: SessionId,
node_id: NodeId,
error: io::Error,
},
/// Thrown when authentication via Ecies failed.
EciesAuthError { remote_addr: SocketAddr, session_id: SessionId, error: ECIESError },
}
/// Commands that can be sent to the spawned session.
#[derive(Debug)]
pub(crate) enum SessionCommand {
/// Disconnect the connection
Disconnect,
Message(CapabilityMessage),
}
/// Message variants an active session can produce and send back to the
/// [`SessionManager`](crate::session::SessionManager)
#[derive(Debug)]
pub(crate) enum ActiveSessionMessage {
/// Session disconnected.
Closed { node_id: NodeId, remote_addr: SocketAddr },
/// A session received a valid message via RLPx.
ValidMessage {
/// Identifier of the remote peer.
node_id: NodeId,
/// Message received from the peer.
message: CapabilityMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidMessage {
/// Identifier of the remote peer.
node_id: NodeId,
/// Announced capabilities of the remote peer.
capabilities: Arc<Capabilities>,
/// Message received from the peer.
message: CapabilityMessage,
},
}
/// A Cloneable connection for sending messages directly to the session of a peer.
#[derive(Debug, Clone)]
pub struct PeerMessageSender {
/// id of the remote node.
pub(crate) peer: NodeId,
/// The Sender half connected to a session.
pub(crate) to_session_tx: mpsc::Sender<CapabilityMessage>,
}

View File

@ -0,0 +1,518 @@
//! Support for handling peer sessions.
use crate::{
message::{Capabilities, CapabilityMessage},
session::handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
},
NodeId,
};
use fnv::FnvHashMap;
use futures::{future::Either, io, FutureExt, StreamExt};
pub use handle::PeerMessageSender;
use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::UnauthedEthStream;
use secp256k1::{SecretKey, SECP256K1};
use std::{
collections::HashMap,
future::Future,
net::SocketAddr,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
task::JoinSet,
};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{instrument, trace, warn};
mod handle;
/// Internal identifier for active sessions.
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
pub struct SessionId(usize);
/// Manages a set of sessions.
#[must_use = "Session Manager must be polled to process session events."]
pub(crate) struct SessionManager {
/// Tracks the identifier for the next session.
next_id: usize,
/// The secret key used for authenticating sessions.
secret_key: SecretKey,
/// The node id of node
node_id: NodeId,
/// Size of the command buffer per session.
session_command_buffer: usize,
/// All spawned session tasks.
///
/// Note: If dropped, the session tasks are aborted.
spawned_tasks: JoinSet<()>,
/// All pending session that are currently handshaking, exchanging `Hello`s.
///
/// Events produced during the authentication phase are reported to this manager. Once the
/// session is authenticated, it can be moved to the `active_session` set.
pending_sessions: FnvHashMap<SessionId, PendingSessionHandle>,
/// All active sessions that are ready to exchange messages.
active_sessions: HashMap<NodeId, ActiveSessionHandle>,
/// The original Sender half of the [`PendingSessionEvent`] channel.
///
/// When a new (pending) session is created, the corresponding [`PendingSessionHandle`] will
/// get a clone of this sender half.
pending_sessions_tx: mpsc::Sender<PendingSessionEvent>,
/// Receiver half that listens for [`PendingSessionEvent`] produced by pending sessions.
pending_session_rx: ReceiverStream<PendingSessionEvent>,
/// The original Sender half of the [`ActiveSessionEvent`] channel.
///
/// When active session state is reached, the corresponding [`ActiveSessionHandle`] will get a
/// clone of this sender half.
active_session_tx: mpsc::Sender<ActiveSessionMessage>,
/// Receiver half that listens for [`ActiveSessionEvent`] produced by pending sessions.
active_session_rx: ReceiverStream<ActiveSessionMessage>,
}
// === impl SessionManager ===
impl SessionManager {
/// Creates a new empty [`SessionManager`].
pub(crate) fn new(secret_key: SecretKey, config: SessionsConfig) -> Self {
let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
let pk = secret_key.public_key(SECP256K1);
let node_id = NodeId::from_slice(&pk.serialize_uncompressed()[1..]);
Self {
next_id: 0,
secret_key,
node_id,
session_command_buffer: config.session_command_buffer,
spawned_tasks: Default::default(),
pending_sessions: Default::default(),
active_sessions: Default::default(),
pending_sessions_tx,
pending_session_rx: ReceiverStream::new(pending_sessions_rx),
active_session_tx,
active_session_rx: ReceiverStream::new(active_session_rx),
}
}
/// Returns the next unique [`SessionId`].
fn next_id(&mut self) -> SessionId {
let id = self.next_id;
self.next_id += 1;
SessionId(id)
}
/// Spawns the given future onto a new task that is tracked in the `spawned_tasks` [`JoinSet`].
fn spawn<F>(&mut self, f: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.spawned_tasks.spawn(async move { f.await });
}
/// A incoming TCP connection was received. This starts the authentication process to turn this
/// stream into an active peer session.
///
/// Returns an error if the configured limit has been reached.
pub(crate) fn on_incoming(
&mut self,
stream: TcpStream,
remote_addr: SocketAddr,
) -> Result<SessionId, ExceedsSessionLimit> {
// TODO(mattsse): enforce limits
let session_id = self.next_id();
let (disconnect_tx, disconnect_rx) = oneshot::channel();
let pending_events = self.pending_sessions_tx.clone();
self.spawn(start_pending_incoming_session(
disconnect_rx,
session_id,
stream,
pending_events,
remote_addr,
self.secret_key,
));
let handle = PendingSessionHandle { disconnect_tx };
self.pending_sessions.insert(session_id, handle);
Ok(session_id)
}
/// Starts a new pending session from the local node to the given remote node.
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_node_id: NodeId) {
let session_id = self.next_id();
let (disconnect_tx, disconnect_rx) = oneshot::channel();
let pending_events = self.pending_sessions_tx.clone();
self.spawn(start_pending_outbound_session(
disconnect_rx,
pending_events,
session_id,
remote_addr,
remote_node_id,
self.secret_key,
));
let handle = PendingSessionHandle { disconnect_tx };
self.pending_sessions.insert(session_id, handle);
}
/// Initiates a shutdown of the channel.
///
/// This will trigger the disconnect on the session task to gracefully terminate. The result
/// will be picked up by the receiver.
pub(crate) fn disconnect(&self, node: NodeId) {
if let Some(session) = self.active_sessions.get(&node) {
session.disconnect();
}
}
/// This polls all the session handles and returns [`SessionEvent`].
///
/// Active sessions are prioritized.
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent> {
// Poll events from active sessions
match self.active_session_rx.poll_next_unpin(cx) {
Poll::Pending => {}
Poll::Ready(None) => {
unreachable!("Manager holds both channel halves.")
}
Poll::Ready(Some(event)) => {
return match event {
ActiveSessionMessage::Closed { node_id, remote_addr } => {
trace!(?node_id, target = "net::session", "closed active session.");
let _ = self.active_sessions.remove(&node_id);
Poll::Ready(SessionEvent::Disconnected { node_id, remote_addr })
}
ActiveSessionMessage::ValidMessage { node_id, message } => {
// TODO: since all messages are known they should be decoded in the session
Poll::Ready(SessionEvent::ValidMessage { node_id, message })
}
ActiveSessionMessage::InvalidMessage { node_id, capabilities, message } => {
Poll::Ready(SessionEvent::InvalidMessage { node_id, message, capabilities })
}
}
}
}
// Poll the pending session event stream
loop {
let event = match self.pending_session_rx.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
Poll::Ready(Some(event)) => event,
};
match event {
PendingSessionEvent::SuccessfulHandshake { remote_addr, session_id } => {
trace!(
?session_id,
?remote_addr,
target = "net::session",
"successful handshake"
);
}
PendingSessionEvent::Hello {
session_id,
node_id: _,
capabilities: _,
stream: _,
} => {
// move from pending to established.
let _ = self.pending_sessions.remove(&session_id);
// TODO spawn the authenticated session
// let session = ActiveSessionHandle {
// session_id,
// remote_id: node_id,
// established: Instant::now(),
// capabilities,
// commands
// };
// self.active_sessions.insert(node_id, session);
// return Poll::Ready(SessionEvent::SessionAuthenticated {
// node_id,
// capabilities,
// messages: ()
// })
}
PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
trace!(
?session_id,
?remote_addr,
target = "net::session",
"disconnected pending session"
);
let _ = self.pending_sessions.remove(&session_id);
return match direction {
Direction::Incoming => {
Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
remote_addr,
error,
})
}
Direction::Outgoing(node_id) => {
Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
remote_addr,
node_id,
error,
})
}
}
}
PendingSessionEvent::OutgoingConnectionError {
remote_addr,
session_id,
node_id,
error,
} => {
trace!(
?error,
?session_id,
?remote_addr,
?node_id,
target = "net::session",
"connection refused"
);
let _ = self.pending_sessions.remove(&session_id);
return Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
remote_addr,
error: None,
})
}
PendingSessionEvent::EciesAuthError { remote_addr, session_id, error } => {
let _ = self.pending_sessions.remove(&session_id);
warn!(
?error,
?session_id,
?remote_addr,
target = "net::session",
"ecies auth failed"
);
let _ = self.pending_sessions.remove(&session_id);
return Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
remote_addr,
error: None,
})
}
}
}
Poll::Pending
}
}
/// Configuration options when creating a [`SessionsManager`].
pub struct SessionsConfig {
/// Size of the session command buffer (per session task).
pub session_command_buffer: usize,
/// Size of the session event channel buffer.
pub session_event_buffer: usize,
}
impl Default for SessionsConfig {
fn default() -> Self {
SessionsConfig {
// This should be sufficient to slots for handling commands sent to the session task,
// since the manager is the sender.
session_command_buffer: 10,
// This should be greater since the manager is the receiver. The total size will be
// `buffer + num sessions`. Each session can therefor fit at least 1 message in the
// channel. The buffer size is additional capacity. The channel is always drained on
// `poll`.
session_event_buffer: 64,
}
}
}
impl SessionsConfig {
/// Sets the buffer size for the bounded communication channel between the manager and its
/// sessions for events emitted by the sessions.
///
/// It is expected, that the background session task will stall if they outpace the manager. The
/// buffer size provides backpressure on the network I/O.
pub fn with_session_event_buffer(mut self, n: usize) -> Self {
self.session_event_buffer = n;
self
}
}
/// Events produced by the [`SessionManager`]
pub(crate) enum SessionEvent {
/// A new session was successfully authenticated.
///
/// This session is now able to exchange data.
SessionAuthenticated {
node_id: NodeId,
remote_addr: SocketAddr,
capabilities: Arc<Capabilities>,
messages: PeerMessageSender,
},
/// A session received a valid message via RLPx.
ValidMessage {
node_id: NodeId,
/// Message received from the peer.
message: CapabilityMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidMessage {
node_id: NodeId,
/// Announced capabilities of the remote peer.
capabilities: Arc<Capabilities>,
/// Message received from the peer.
message: CapabilityMessage,
},
/// Closed an incoming pending session during authentication.
IncomingPendingSessionClosed { remote_addr: SocketAddr, error: Option<ECIESError> },
/// Closed an outgoing pending session during authentication.
OutgoingPendingSessionClosed {
remote_addr: SocketAddr,
node_id: NodeId,
error: Option<ECIESError>,
},
/// Failed to establish a tcp stream
OutgoingConnectionError { remote_addr: SocketAddr, node_id: NodeId, error: io::Error },
/// Active session was disconnected.
Disconnected { node_id: NodeId, remote_addr: SocketAddr },
}
/// The error thrown when the max configured limit has been reached and no more connections are
/// accepted.
#[derive(Debug, Clone, thiserror::Error)]
#[error("Session limit reached {0}")]
pub struct ExceedsSessionLimit(usize);
/// Starts the authentication process for a connection initiated by a remote peer.
///
/// This will wait for the _incoming_ handshake request and answer it.
async fn start_pending_incoming_session(
disconnect_rx: oneshot::Receiver<()>,
session_id: SessionId,
stream: TcpStream,
events: mpsc::Sender<PendingSessionEvent>,
remote_addr: SocketAddr,
secret_key: SecretKey,
) {
authenticate(
disconnect_rx,
events,
stream,
session_id,
remote_addr,
secret_key,
Direction::Incoming,
)
.await
}
/// Starts the authentication process for a connection initiated by a remote peer.
#[instrument(skip_all, fields(%remote_addr, node_id), target = "net")]
async fn start_pending_outbound_session(
disconnect_rx: oneshot::Receiver<()>,
events: mpsc::Sender<PendingSessionEvent>,
session_id: SessionId,
remote_addr: SocketAddr,
remote_node_id: NodeId,
secret_key: SecretKey,
) {
let stream = match TcpStream::connect(remote_addr).await {
Ok(stream) => stream,
Err(error) => {
let _ = events
.send(PendingSessionEvent::OutgoingConnectionError {
remote_addr,
session_id,
node_id: remote_node_id,
error,
})
.await;
return
}
};
authenticate(
disconnect_rx,
events,
stream,
session_id,
remote_addr,
secret_key,
Direction::Outgoing(remote_node_id),
)
.await
}
/// The direction of the connection.
#[derive(Debug, Copy, Clone)]
pub(crate) enum Direction {
/// Incoming connection.
Incoming,
/// Outgoing connection to a specific node.
Outgoing(NodeId),
}
async fn authenticate(
disconnect_rx: oneshot::Receiver<()>,
events: mpsc::Sender<PendingSessionEvent>,
stream: TcpStream,
session_id: SessionId,
remote_addr: SocketAddr,
secret_key: SecretKey,
direction: Direction,
) {
let stream = match direction {
Direction::Incoming => match ECIESStream::incoming(stream, secret_key).await {
Ok(stream) => stream,
Err(error) => {
let _ = events
.send(PendingSessionEvent::EciesAuthError { remote_addr, session_id, error })
.await;
return
}
},
Direction::Outgoing(remote_node_id) => {
match ECIESStream::connect(stream, secret_key, remote_node_id).await {
Ok(stream) => stream,
Err(error) => {
let _ = events
.send(PendingSessionEvent::EciesAuthError {
remote_addr,
session_id,
error,
})
.await;
return
}
}
}
};
let unauthed = UnauthedEthStream::new(stream);
let auth = authenticate_stream(unauthed, session_id, remote_addr, direction).boxed();
match futures::future::select(disconnect_rx, auth).await {
Either::Left((_, _)) => {
let _ = events
.send(PendingSessionEvent::Disconnected {
remote_addr,
session_id,
direction,
error: None,
})
.await;
}
Either::Right((res, _)) => {
let _ = events.send(res).await;
}
}
}
/// Authenticate the stream via handshake
///
/// On Success return the authenticated stream as [`PendingSessionEvent`]
async fn authenticate_stream(
_stream: UnauthedEthStream<ECIESStream<TcpStream>>,
_session_id: SessionId,
_remote_addr: SocketAddr,
_direction: Direction,
) -> PendingSessionEvent {
todo!()
}

View File

@ -0,0 +1,217 @@
//! Keeps track of the state of the network.
use crate::{
discovery::{Discovery, DiscoveryEvent},
fetch::StateFetcher,
message::{Capabilities, CapabilityResponse},
peers::{PeerAction, PeersManager},
session::PeerMessageSender,
NodeId,
};
use futures::FutureExt;
use reth_interfaces::provider::BlockProvider;
use reth_primitives::{H256, U256};
use std::{
collections::{HashMap, VecDeque},
net::SocketAddr,
sync::Arc,
task::{Context, Poll},
time::Instant,
};
use tokio::sync::oneshot;
use tracing::trace;
/// The [`NetworkState`] keeps track of the state of all peers in the network.
///
/// This includes:
/// - [`Discovery`]: manages the discovery protocol, essentially a stream of discovery updates
/// - [`PeersManager`]: keeps track of connected peers and issues new outgoing connections
/// depending on the configured capacity.
/// - [`StateFetcher`]: streams download request (received from outside via channel) which are
/// then send to the session of the peer.
///
/// This type is also responsible for responding for received request.
pub struct NetworkState<C> {
/// All connected peers and their state.
connected_peers: HashMap<NodeId, ConnectedPeer>,
/// Manages connections to peers.
peers_manager: PeersManager,
/// Tracks the state of connected peers
peers_state: HashMap<NodeId, PeerSessionState>,
/// Buffered messages until polled.
queued_messages: VecDeque<StateAction>,
/// The client type that can interact with the chain.
client: Arc<C>,
/// Network discovery.
discovery: Discovery,
/// The type that handles requests.
///
/// The fetcher streams RLPx related requests on a per-peer basis to this type. This type will
/// then queue in the request and notify the fetcher once the result has been received.
state_fetcher: StateFetcher,
}
impl<C> NetworkState<C>
where
C: BlockProvider,
{
/// Create a new state instance with the given params
pub(crate) fn new(client: Arc<C>, discovery: Discovery, peers_manager: PeersManager) -> Self {
Self {
connected_peers: Default::default(),
peers_manager,
peers_state: Default::default(),
queued_messages: Default::default(),
client,
discovery,
state_fetcher: Default::default(),
}
}
/// Event hook for an authenticated session for the peer.
pub(crate) fn on_session_authenticated(
&mut self,
_node_id: NodeId,
_capabilities: Arc<Capabilities>,
_messages: PeerMessageSender,
) {
// TODO notify fetecher as well
}
/// Event hook for a disconnected session for the peer.
pub(crate) fn on_session_closed(&mut self, _node_id: NodeId) {}
/// Propagates Block to peers.
pub(crate) fn announce_block(&mut self, _hash: H256, _block: ()) {
// TODO propagate the newblock messages to all connected peers that haven't seen the block
// yet
todo!()
}
/// Event hook for events received from the discovery service.
fn on_discovery_event(&mut self, event: DiscoveryEvent) {
match event {
DiscoveryEvent::Discovered(node, addr) => {
self.peers_manager.add_discovered_node(node, addr);
}
}
}
/// Event hook for new actions derived from the peer management set.
fn on_peer_action(&mut self, action: PeerAction) {
match action {
PeerAction::Connect { node_id, remote_addr } => {
self.peers_state.insert(node_id, PeerSessionState::Connecting);
self.queued_messages.push_back(StateAction::Connect { node_id, remote_addr });
}
PeerAction::Disconnect { node_id } => {
self.peers_state.remove(&node_id);
self.queued_messages.push_back(StateAction::Disconnect { node_id });
}
}
}
/// Disconnect the session
fn disconnect_session(&mut self, _node: NodeId) {}
/// Invoked when received a response from a connected peer.
fn on_response(&mut self, _node: NodeId, _resp: CapabilityResponse) {}
/// Advances the state
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction> {
loop {
// drain buffered messages
if let Some(message) = self.queued_messages.pop_front() {
return Poll::Ready(message)
}
while let Poll::Ready(discovery) = self.discovery.poll(cx) {
self.on_discovery_event(discovery);
}
let mut disconnect_sessions = Vec::new();
let mut received_responses = Vec::new();
// poll all connected peers for responses
for (id, peer) in self.connected_peers.iter_mut() {
if let Some(response) = peer.pending_response.as_mut() {
match response.poll_unpin(cx) {
Poll::Ready(Ok(resp)) => received_responses.push((*id, resp)),
Poll::Ready(Err(_)) => {
trace!(
?id,
target = "net",
"Request canceled, response channel closed."
);
disconnect_sessions.push(*id);
}
Poll::Pending => continue,
};
}
// request has either returned a response or was canceled here
peer.pending_response.take();
}
for node in disconnect_sessions {
self.disconnect_session(node)
}
for (id, resp) in received_responses {
self.on_response(id, resp);
}
// poll peer manager
while let Poll::Ready(action) = self.peers_manager.poll(cx) {
self.on_peer_action(action);
}
if self.queued_messages.is_empty() {
return Poll::Pending
}
}
}
}
/// Tracks the state of a Peer.
///
/// For example known blocks,so we can decide what to announce.
pub struct ConnectedPeer {
/// Best block of the peer.
pub(crate) best_hash: H256,
/// Best block number of the peer.
pub(crate) best_number: U256,
/// A communication channel directly to the session service.
pub(crate) message_tx: PeerMessageSender,
/// The response receiver for a currently active request to that peer.
pub(crate) pending_response: Option<oneshot::Receiver<CapabilityResponse>>,
}
/// Tracks the current state of the peer session
pub enum PeerSessionState {
/// Starting state for outbound connections.
///
/// This will be triggered by a [`PeerAction::Connect`] action.
/// The peer will reside in the state until the connection has been authenticated.
Connecting,
/// Established connection that hasn't been authenticated yet.
Incoming {
/// How long to keep this open.
until: Instant,
sender: PeerMessageSender,
},
/// Node is connected to the peer and is ready to
Ready {
/// Communication channel directly to the session task
sender: PeerMessageSender,
},
}
/// Message variants triggered by the [`State`]
pub enum StateAction {
/// Create a new connection to the given node.
Connect { remote_addr: SocketAddr, node_id: NodeId },
/// Disconnect an existing connection
Disconnect { node_id: NodeId },
}

View File

@ -0,0 +1,242 @@
use crate::{
listener::{ConnectionListener, ListenerEvent},
message::{Capabilities, CapabilityMessage},
session::{SessionEvent, SessionId, SessionManager},
state::{NetworkState, StateAction},
NodeId,
};
use futures::Stream;
use reth_ecies::ECIESError;
use reth_interfaces::provider::BlockProvider;
use std::{
io,
net::SocketAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tracing::warn;
/// Contains the connectivity related state of the network.
///
/// A swarm emits [`SwarmEvent`]s when polled.
///
/// The manages the [`ConnectionListener`] and delegates new incoming connections to the
/// [`SessionsManager`]. Outgoing connections are either initiated on demand or triggered by the
/// [`NetworkState`] and also delegated to the [`NetworkState`].
#[must_use = "Swarm does nothing unless polled"]
pub struct Swarm<C> {
/// Listens for new incoming connections.
incoming: ConnectionListener,
/// All sessions.
sessions: SessionManager,
/// Tracks the entire state of the network and handles events received from the sessions.
state: NetworkState<C>,
}
// === impl Swarm ===
impl<C> Swarm<C>
where
C: BlockProvider,
{
/// Configures a new swarm instance.
pub(crate) fn new(
incoming: ConnectionListener,
sessions: SessionManager,
state: NetworkState<C>,
) -> Self {
Self { incoming, sessions, state }
}
/// Mutable access to the state.
pub(crate) fn state_mut(&mut self) -> &mut NetworkState<C> {
&mut self.state
}
/// Triggers a new outgoing connection to the given node
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: NodeId) {
self.sessions.dial_outbound(remote_addr, remote_id)
}
/// Handles a polled [`SessionEvent`]
fn on_session_event(&mut self, event: SessionEvent) -> Option<SwarmEvent> {
match event {
SessionEvent::SessionAuthenticated { node_id, remote_addr, capabilities, messages } => {
self.state.on_session_authenticated(node_id, capabilities, messages);
Some(SwarmEvent::SessionEstablished { node_id, remote_addr })
}
SessionEvent::ValidMessage { node_id, message } => {
Some(SwarmEvent::CapabilityMessage { node_id, message })
}
SessionEvent::InvalidMessage { node_id, capabilities, message } => {
Some(SwarmEvent::InvalidCapabilityMessage { node_id, capabilities, message })
}
SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => {
Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error })
}
SessionEvent::OutgoingPendingSessionClosed { remote_addr, node_id, error } => {
Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, node_id, error })
}
SessionEvent::Disconnected { node_id, remote_addr } => {
self.state.on_session_closed(node_id);
Some(SwarmEvent::SessionClosed { node_id, remote_addr })
}
SessionEvent::OutgoingConnectionError { remote_addr, node_id, error } => {
Some(SwarmEvent::OutgoingConnectionError { node_id, remote_addr, error })
}
}
}
/// Callback for events produced by [`ConnectionListener`].
///
/// Depending on the event, this will produce a new [`SwarmEvent`].
fn on_connection(&mut self, event: ListenerEvent) -> Option<SwarmEvent> {
match event {
ListenerEvent::Error(err) => return Some(SwarmEvent::TcpListenerError(err)),
ListenerEvent::ListenerClosed { local_address: address } => {
return Some(SwarmEvent::TcpListenerClosed { remote_addr: address })
}
ListenerEvent::Incoming { stream, remote_addr } => {
match self.sessions.on_incoming(stream, remote_addr) {
Ok(session_id) => {
return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr })
}
Err(err) => {
warn!(?err, "Incoming connection rejected");
}
}
}
}
None
}
/// Hook for actions pulled from the state
fn on_state_action(&mut self, event: StateAction) -> Option<SwarmEvent> {
match event {
StateAction::Connect { remote_addr, node_id } => {
self.sessions.dial_outbound(remote_addr, node_id);
}
StateAction::Disconnect { node_id } => {
self.sessions.disconnect(node_id);
}
}
None
}
}
impl<C> Stream for Swarm<C>
where
C: BlockProvider,
{
type Item = SwarmEvent;
/// This advances all components.
///
/// Processes, delegates (internal) commands received from the [`NetworkManager`], then polls
/// the [`SessionManager`] which yields messages produced by individual peer sessions that are
/// then handled. Least priority are incoming connections that are handled and delegated to
/// the [`SessionManager`] to turn them into a session.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
while let Poll::Ready(action) = this.state.poll(cx) {
if let Some(event) = this.on_state_action(action) {
return Poll::Ready(Some(event))
}
}
// poll all sessions
match this.sessions.poll(cx) {
Poll::Pending => {}
Poll::Ready(event) => {
if let Some(event) = this.on_session_event(event) {
return Poll::Ready(Some(event))
}
continue
}
}
// poll listener for incoming connections
match Pin::new(&mut this.incoming).poll(cx) {
Poll::Pending => {}
Poll::Ready(event) => {
if let Some(event) = this.on_connection(event) {
return Poll::Ready(Some(event))
}
continue
}
}
return Poll::Pending
}
}
}
/// All events created or delegated by the [`Swarm`] that represents changes to the state of the
/// network.
pub enum SwarmEvent {
/// Events related to the actual network protocol.
CapabilityMessage {
/// The peer that sent the message
node_id: NodeId,
/// Message received from the peer
message: CapabilityMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidCapabilityMessage {
node_id: NodeId,
/// Announced capabilities of the remote peer.
capabilities: Arc<Capabilities>,
/// Message received from the peer.
message: CapabilityMessage,
},
/// The underlying tcp listener closed.
TcpListenerClosed {
/// Address of the closed listener.
remote_addr: SocketAddr,
},
/// The underlying tcp listener encountered an error that we bubble up.
TcpListenerError(io::Error),
/// Received an incoming tcp connection.
///
/// This represents the first step in the session authentication process. The swarm will
/// produce subsequent events once the stream has been authenticated, or was rejected.
IncomingTcpConnection {
/// The internal session identifier under which this connection is currently tracked.
session_id: SessionId,
/// Address of the remote peer.
remote_addr: SocketAddr,
},
/// An outbound connection is initiated.
OutgoingTcpConnection {
/// Address of the remote peer.
remote_addr: SocketAddr,
},
SessionEstablished {
node_id: NodeId,
remote_addr: SocketAddr,
},
SessionClosed {
node_id: NodeId,
remote_addr: SocketAddr,
},
/// Closed an incoming pending session during authentication.
IncomingPendingSessionClosed {
remote_addr: SocketAddr,
error: Option<ECIESError>,
},
/// Closed an outgoing pending session during authentication.
OutgoingPendingSessionClosed {
remote_addr: SocketAddr,
node_id: NodeId,
error: Option<ECIESError>,
},
/// Failed to establish a tcp stream to the given address/node
OutgoingConnectionError {
remote_addr: SocketAddr,
node_id: NodeId,
error: io::Error,
},
}

View File

@ -0,0 +1,86 @@
//! Transaction management for the p2p network.
use crate::{manager::NetworkEvent, NetworkHandle};
use reth_primitives::{Transaction, H256};
use reth_transaction_pool::TransactionPool;
use std::collections::HashMap;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
/// Api to interact with [`TransactionsManager`] task.
pub struct TransactionsHandle {
/// Command channel to the [`TransactionsManager`]
manager_tx: mpsc::UnboundedSender<TransactionsCommand>,
}
/// Manages transactions on top of the p2p network.
///
/// This can be spawned to another task and is supposed to be run as background service while
/// [`TransactionsHandle`] is used as frontend to send commands to.
///
/// The [`TransactionsManager`] is responsible for:
/// - handling incoming eth messages for transactions.
/// - serving transaction requests.
/// - propagate transactions
///
/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
/// - receives incoming network messages.
/// - sends messages to dispatch (responses, propagate tx)
///
/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
/// propagate new transactions over the network.
#[must_use = "Manager does nothing unless polled."]
pub struct TransactionsManager<Pool> {
/// Access to the transaction pool.
pool: Pool,
/// Network access.
network: NetworkHandle,
/// Subscriptions to all network related events.
///
/// From which we get all new incoming transaction related messages.
network_events: UnboundedReceiverStream<NetworkEvent>,
/// All currently pending transactions
pending_transactions: (),
/// All the peers that have sent the same transactions.
peers: HashMap<H256, Vec<()>>,
/// Send half for the command channel.
command_tx: mpsc::UnboundedSender<TransactionsCommand>,
/// Incoming commands from [`TransactionsHandle`].
command_rx: UnboundedReceiverStream<TransactionsCommand>,
}
// === impl TransactionsManager ===
impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool<Transaction = Transaction>,
{
/// Sets up a new instance.
pub fn new(network: NetworkHandle, pool: Pool) -> Self {
let network_events = network.event_listener();
let (command_tx, command_rx) = mpsc::unbounded_channel();
Self {
pool,
network,
network_events: UnboundedReceiverStream::new(network_events),
pending_transactions: (),
peers: Default::default(),
command_tx,
command_rx: UnboundedReceiverStream::new(command_rx),
}
}
/// Returns a new handle that can send commands to this type.
pub fn handle(&self) -> TransactionsHandle {
TransactionsHandle { manager_tx: self.command_tx.clone() }
}
/// Executes an endless future
pub async fn run(self) {}
}
/// Commands to send to the [`TransactionManager`]
enum TransactionsCommand {
Propagate(H256),
}