feat(net): authenticate sessions (#178)

* Switch stream type of ActiveSession to EthStream

 * Start `StatusBuilder` for initializing the `Status` message required
   for the handshake
 * Add `Hardfork` for `Status` default forkid
 * Add `MAINNET_GENESIS` constant

* finish `StatusBuilder`

* initialize eth streams in session

 * add status, hello, and fork filter to session manager
 * fix status builder example
 * add status and hello to network config
   * will probably remove

* removing status and hello from networkconfig

* move forkid to primitives

* change imports for forkid

* add hardfork to primitives

* remove hardfork and forkid from eth-wire

* fix remaining eth-wire forkid references

* put mainnet genesis in constants, remove NodeId

* replace NodeId with PeerId

 * the only NodeId remaining is inherited from enr
 * PeerId still needs to be documented
 * also run cargo fmt

* replace loop with iter().any()

* ignore missing docs for hardforks

* use correct PeerId for Discv4::bind example test

* document PeerId as secp256k1 public key

* cargo fmt

* temporarily allow too_many_arguments

 * the authenticate and start_pending_incoming_session methods have many
   arguments, we can reconsider the lint or fix the methods at a later
   point
This commit is contained in:
Dan Cline
2022-11-14 12:03:05 -05:00
committed by GitHub
parent 5ca2cab97f
commit f1e6639374
31 changed files with 714 additions and 207 deletions

View File

@ -1,7 +1,6 @@
use crate::{peers::PeersConfig, session::SessionsConfig};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_PORT};
use reth_eth_wire::forkid::ForkId;
use reth_primitives::{Chain, H256};
use reth_primitives::{Chain, ForkId, H256};
use secp256k1::SecretKey;
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
@ -76,8 +75,12 @@ pub struct NetworkConfigBuilder<C> {
peers_config: Option<PeersConfig>,
/// How to configure the sessions manager
sessions_config: Option<SessionsConfig>,
/// A fork identifier as defined by EIP-2124.
/// Serves as the chain compatibility identifier.
fork_id: Option<ForkId>,
/// The network's chain id
chain: Chain,
/// Network genesis hash
genesis_hash: H256,
}

View File

@ -1,8 +1,9 @@
//! Discovery support for the network.
use crate::{error::NetworkError, NodeId};
use crate::error::NetworkError;
use futures::StreamExt;
use reth_discv4::{Discv4, Discv4Config, NodeRecord, TableUpdate};
use reth_primitives::PeerId;
use secp256k1::SecretKey;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
@ -19,7 +20,7 @@ pub struct Discovery {
/// All nodes discovered via discovery protocol.
///
/// These nodes can be ephemeral and are updated via the discovery protocol.
discovered_nodes: HashMap<NodeId, SocketAddr>,
discovered_nodes: HashMap<PeerId, SocketAddr>,
/// Local ENR of the discovery service.
local_enr: NodeRecord,
/// Handler to interact with the Discovery v4 service
@ -66,12 +67,12 @@ impl Discovery {
}
/// Returns the id with which the local identifies itself in the network
pub(crate) fn local_id(&self) -> NodeId {
pub(crate) fn local_id(&self) -> PeerId {
self.local_enr.id
}
/// Manually adds an address to the set.
pub(crate) fn add_known_address(&mut self, node_id: NodeId, addr: SocketAddr) {
pub(crate) fn add_known_address(&mut self, node_id: PeerId, addr: SocketAddr) {
self.on_discv4_update(TableUpdate::Added(NodeRecord {
address: addr.ip(),
tcp_port: addr.port(),
@ -81,7 +82,7 @@ impl Discovery {
}
/// Returns all nodes we know exist in the network.
pub fn known_nodes(&mut self) -> &HashMap<NodeId, SocketAddr> {
pub fn known_nodes(&mut self) -> &HashMap<PeerId, SocketAddr> {
&self.discovered_nodes
}
@ -131,7 +132,7 @@ impl Discovery {
/// Events produced by the [`Discovery`] manager.
pub enum DiscoveryEvent {
/// A new node was discovered
Discovered(NodeId, SocketAddr),
Discovered(PeerId, SocketAddr),
}
#[cfg(test)]

View File

@ -1,10 +1,10 @@
//! Fetch data from the network.
use crate::{message::BlockRequest, NodeId};
use crate::message::BlockRequest;
use futures::StreamExt;
use reth_eth_wire::{BlockBody, EthMessage};
use reth_interfaces::p2p::{error::RequestResult, headers::client::HeadersRequest};
use reth_primitives::{Header, H256, U256};
use reth_primitives::{Header, PeerId, H256, U256};
use std::{
collections::{HashMap, VecDeque},
task::{Context, Poll},
@ -19,9 +19,9 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
/// peers and sends the response once ready.
pub struct StateFetcher {
/// Currently active [`GetBlockHeaders`] requests
inflight_headers_requests: HashMap<NodeId, Request<HeadersRequest, RequestResult<Vec<Header>>>>,
inflight_headers_requests: HashMap<PeerId, Request<HeadersRequest, RequestResult<Vec<Header>>>>,
/// The list of available peers for requests.
peers: HashMap<NodeId, Peer>,
peers: HashMap<PeerId, Peer>,
/// Requests queued for processing
queued_requests: VecDeque<DownloadRequest>,
/// Receiver for new incoming download requests
@ -34,13 +34,13 @@ pub struct StateFetcher {
impl StateFetcher {
/// Invoked when connected to a new peer.
pub(crate) fn new_connected_peer(&mut self, _node_id: NodeId, _best_hash: H256) {}
pub(crate) fn new_connected_peer(&mut self, _node_id: PeerId, _best_hash: H256) {}
/// Invoked when an active session was closed.
pub(crate) fn on_session_closed(&mut self, _peer: &NodeId) {}
pub(crate) fn on_session_closed(&mut self, _peer: &PeerId) {}
/// Invoked when an active session is about to be disconnected.
pub(crate) fn on_pending_disconnect(&mut self, _peer: &NodeId) {}
pub(crate) fn on_pending_disconnect(&mut self, _peer: &PeerId) {}
/// Returns the next action to return
fn poll_action(&mut self) -> Option<FetchAction> {
@ -94,7 +94,7 @@ impl StateFetcher {
/// Called on a `GetBlockHeaders` response from a peer
pub(crate) fn on_block_headers_response(
&mut self,
_peer: NodeId,
_peer: PeerId,
_res: RequestResult<Vec<Header>>,
) -> Option<BlockResponseOutcome> {
None
@ -103,7 +103,7 @@ impl StateFetcher {
/// Called on a `GetBlockBodies` response from a peer
pub(crate) fn on_block_bodies_response(
&mut self,
_peer: NodeId,
_peer: PeerId,
_res: RequestResult<Vec<BlockBody>>,
) -> Option<BlockResponseOutcome> {
None
@ -189,7 +189,7 @@ enum DownloadRequest {
pub(crate) enum FetchAction {
/// Dispatch an eth request to the given peer.
EthRequest {
node_id: NodeId,
node_id: PeerId,
/// The request to send
request: EthMessage,
},
@ -201,8 +201,8 @@ pub(crate) enum FetchAction {
#[derive(Debug)]
pub(crate) enum BlockResponseOutcome {
/// Continue with another request to the peer.
Request(NodeId, BlockRequest),
Request(PeerId, BlockRequest),
/// How to handle a bad response
// TODO this should include some form of reputation change
BadResponse(NodeId),
BadResponse(PeerId),
}

View File

@ -5,7 +5,7 @@
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
// TODO remove later
#![allow(dead_code)]
#![allow(dead_code, clippy::too_many_arguments)]
//! reth P2P networking.
//!
@ -29,9 +29,6 @@ mod state;
mod swarm;
mod transactions;
/// Identifier for a unique node
pub type NodeId = reth_discv4::NodeId;
pub use config::NetworkConfig;
pub use manager::NetworkManager;
pub use network::NetworkHandle;

View File

@ -25,7 +25,6 @@ use crate::{
session::SessionManager,
state::NetworkState,
swarm::{Swarm, SwarmEvent},
NodeId,
};
use futures::{Future, StreamExt};
use parking_lot::Mutex;
@ -34,6 +33,7 @@ use reth_eth_wire::{
EthMessage,
};
use reth_interfaces::provider::BlockProvider;
use reth_primitives::PeerId;
use std::{
net::SocketAddr,
pin::Pin,
@ -88,8 +88,8 @@ pub struct NetworkManager<C> {
/// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
num_active_peers: Arc<AtomicUsize>,
/// Local copy of the `NodeId` of the local node.
local_node_id: NodeId,
/// Local copy of the `PeerId` of the local node.
local_node_id: PeerId,
}
// === impl NetworkManager ===
@ -163,7 +163,7 @@ where
/// Event hook for an unexpected message from the peer.
fn on_invalid_message(
&self,
node_id: NodeId,
node_id: PeerId,
_capabilities: Arc<Capabilities>,
_message: CapabilityMessage,
) {
@ -172,7 +172,7 @@ where
}
/// Handles a received [`CapabilityMessage`] from the peer.
fn on_capability_message(&mut self, _node_id: NodeId, msg: CapabilityMessage) {
fn on_capability_message(&mut self, _node_id: PeerId, msg: CapabilityMessage) {
match msg {
CapabilityMessage::Eth(eth) => {
match eth {
@ -299,7 +299,7 @@ where
/// Events emitted by the network that are of interest for subscribers.
#[derive(Debug, Clone)]
pub enum NetworkEvent {
EthMessage { node_id: NodeId, message: EthMessage },
EthMessage { node_id: PeerId, message: EthMessage },
}
/// Bundles all listeners for [`NetworkEvent`]s.

View File

@ -11,10 +11,9 @@ use reth_eth_wire::{
};
use std::task::{ready, Context, Poll};
use crate::NodeId;
use reth_eth_wire::capability::CapabilityMessage;
use reth_interfaces::p2p::error::RequestResult;
use reth_primitives::{Header, Receipt, TransactionSigned};
use reth_primitives::{Header, PeerId, Receipt, TransactionSigned};
use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot};
/// Represents all messages that can be sent to a peer session
@ -180,7 +179,7 @@ impl PeerResponseResult {
#[derive(Debug, Clone)]
pub struct PeerRequestSender {
/// id of the remote node.
pub(crate) peer: NodeId,
pub(crate) peer: PeerId,
/// The Sender half connected to a session.
pub(crate) to_session_tx: mpsc::Sender<PeerRequest>,
}

View File

@ -1,6 +1,6 @@
use crate::{manager::NetworkEvent, peers::PeersHandle, NodeId};
use crate::{manager::NetworkEvent, peers::PeersHandle};
use parking_lot::Mutex;
use reth_primitives::{H256, U256};
use reth_primitives::{PeerId, H256, U256};
use std::{
net::SocketAddr,
sync::{atomic::AtomicUsize, Arc},
@ -24,7 +24,7 @@ impl NetworkHandle {
num_active_peers: Arc<AtomicUsize>,
listener_address: Arc<Mutex<SocketAddr>>,
to_manager_tx: UnboundedSender<NetworkHandleMessage>,
local_node_id: NodeId,
local_node_id: PeerId,
peers: PeersHandle,
) -> Self {
let inner = NetworkInner {
@ -57,7 +57,7 @@ struct NetworkInner {
/// The local address that accepts incoming connections.
listener_address: Arc<Mutex<SocketAddr>>,
/// The identifier used by this node.
local_node_id: NodeId,
local_node_id: PeerId,
/// Access to the all the nodes
peers: PeersHandle, // TODO need something to access
}

View File

@ -1,5 +1,5 @@
use futures::StreamExt;
use reth_discv4::NodeId;
use reth_primitives::PeerId;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
net::SocketAddr,
@ -32,7 +32,7 @@ pub struct PeersHandle {
/// The [`PeersManager`] will be notified on peer related changes
pub(crate) struct PeersManager {
/// All peers known to the network
peers: HashMap<NodeId, Peer>,
peers: HashMap<PeerId, Peer>,
/// Copy of the receiver half, so new [`PeersHandle`] can be created on demand.
manager_tx: mpsc::UnboundedSender<PeerCommand>,
/// Receiver half of the command channel.
@ -74,7 +74,7 @@ impl PeersManager {
///
/// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will
/// be scheduled.
pub(crate) fn on_active_session(&mut self, peer_id: NodeId, addr: SocketAddr) {
pub(crate) fn on_active_session(&mut self, peer_id: PeerId, addr: SocketAddr) {
match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
let value = entry.get_mut();
@ -96,7 +96,7 @@ impl PeersManager {
/// Called when a session to a peer was disconnected.
///
/// Accepts an additional [`ReputationChange`] value to apply to the peer.
pub(crate) fn on_disconnected(&mut self, peer: NodeId, reputation_change: ReputationChange) {
pub(crate) fn on_disconnected(&mut self, peer: PeerId, reputation_change: ReputationChange) {
if let Some(mut peer) = self.peers.get_mut(&peer) {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
@ -108,7 +108,7 @@ impl PeersManager {
///
/// If the peer already exists, then the address will e updated. If the addresses differ, the
/// old address is returned
pub(crate) fn add_discovered_node(&mut self, peer_id: NodeId, addr: SocketAddr) {
pub(crate) fn add_discovered_node(&mut self, peer_id: PeerId, addr: SocketAddr) {
match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
let node = entry.get_mut();
@ -121,7 +121,7 @@ impl PeersManager {
}
/// Removes the tracked node from the set.
pub(crate) fn remove_discovered_node(&mut self, peer_id: NodeId) {
pub(crate) fn remove_discovered_node(&mut self, peer_id: PeerId) {
if let Some(entry) = self.peers.remove(&peer_id) {
if entry.state.is_connected() {
self.connection_info.decr_state(entry.state);
@ -133,11 +133,11 @@ impl PeersManager {
/// Returns the idle peer with the highest reputation.
///
/// Returns `None` if no peer is available.
fn best_unconnected(&mut self) -> Option<(NodeId, &mut Peer)> {
fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
self.peers
.iter_mut()
.filter(|(_, peer)| peer.state.is_unconnected())
.fold(None::<(&NodeId, &mut Peer)>, |mut best_peer, candidate| {
.fold(None::<(&PeerId, &mut Peer)>, |mut best_peer, candidate| {
if let Some(best_peer) = best_peer.take() {
if best_peer.1.reputation >= candidate.1.reputation {
return Some(best_peer)
@ -331,14 +331,14 @@ pub(crate) enum PeerCommand {
/// Command for manually add
Add {
/// Identifier of the peer.
peer_id: NodeId,
peer_id: PeerId,
/// The address of the peer
addr: SocketAddr,
},
/// Remove a peer from the set
///
/// If currently connected this will disconnect the sessin
Remove(NodeId),
Remove(PeerId),
}
/// Actions the peer manager can trigger.
@ -347,17 +347,17 @@ pub enum PeerAction {
/// Start a new connection to a peer.
Connect {
/// The peer to connect to.
peer_id: NodeId,
peer_id: PeerId,
/// Where to reach the node
remote_addr: SocketAddr,
},
/// Disconnect an existing connection.
Disconnect { peer_id: NodeId },
Disconnect { peer_id: PeerId },
/// Disconnect an existing incoming connection, because the peers reputation is below the
/// banned threshold.
DisconnectBannedIncoming {
/// Peer id of the established connection.
peer_id: NodeId,
peer_id: PeerId,
},
}

View File

@ -6,13 +6,16 @@ use crate::{
handle::{ActiveSessionMessage, SessionCommand},
SessionId,
},
NodeId,
};
use fnv::FnvHashMap;
use futures::{stream::Fuse, Sink, Stream};
use pin_project::pin_project;
use reth_ecies::stream::ECIESStream;
use reth_eth_wire::capability::{Capabilities, CapabilityMessage};
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
EthStream, P2PStream,
};
use reth_primitives::PeerId;
use std::{
collections::VecDeque,
future::Future,
@ -31,9 +34,9 @@ pub(crate) struct ActiveSession {
pub(crate) next_id: usize,
/// The underlying connection.
#[pin]
pub(crate) conn: ECIESStream<TcpStream>,
pub(crate) conn: EthStream<P2PStream<ECIESStream<TcpStream>>>,
/// Identifier of the node we're connected to.
pub(crate) remote_node_id: NodeId,
pub(crate) remote_node_id: PeerId,
/// All capabilities the peer announced
pub(crate) remote_capabilities: Arc<Capabilities>,
/// Internal identifier of this session

View File

@ -1,13 +1,12 @@
//! Session handles
use crate::{
session::{Direction, SessionId},
NodeId,
};
use crate::session::{Direction, SessionId};
use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
Status,
error::EthStreamError,
EthStream, P2PStream, Status,
};
use reth_primitives::PeerId;
use std::{io, net::SocketAddr, sync::Arc, time::Instant};
use tokio::{
net::TcpStream,
@ -33,7 +32,7 @@ pub(crate) struct ActiveSessionHandle {
/// The assigned id for this session
pub(crate) session_id: SessionId,
/// The identifier of the remote peer
pub(crate) remote_id: NodeId,
pub(crate) remote_id: PeerId,
/// The timestamp when the session has been established.
pub(crate) established: Instant,
/// Announced capabilities of the peer.
@ -65,23 +64,24 @@ pub(crate) enum PendingSessionEvent {
Established {
session_id: SessionId,
remote_addr: SocketAddr,
node_id: NodeId,
/// The remote node's public key
node_id: PeerId,
capabilities: Arc<Capabilities>,
status: Status,
conn: ECIESStream<TcpStream>,
conn: EthStream<P2PStream<ECIESStream<TcpStream>>>,
},
/// Handshake unsuccessful, session was disconnected.
Disconnected {
remote_addr: SocketAddr,
session_id: SessionId,
direction: Direction,
error: Option<ECIESError>,
error: Option<EthStreamError>,
},
/// Thrown when unable to establish a [`TcpStream`].
OutgoingConnectionError {
remote_addr: SocketAddr,
session_id: SessionId,
node_id: NodeId,
node_id: PeerId,
error: io::Error,
},
/// Thrown when authentication via Ecies failed.
@ -101,18 +101,18 @@ pub(crate) enum SessionCommand {
#[derive(Debug)]
pub(crate) enum ActiveSessionMessage {
/// Session disconnected.
Closed { node_id: NodeId, remote_addr: SocketAddr },
Closed { node_id: PeerId, remote_addr: SocketAddr },
/// A session received a valid message via RLPx.
ValidMessage {
/// Identifier of the remote peer.
node_id: NodeId,
node_id: PeerId,
/// Message received from the peer.
message: CapabilityMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidMessage {
/// Identifier of the remote peer.
node_id: NodeId,
node_id: PeerId,
/// Announced capabilities of the remote peer.
capabilities: Arc<Capabilities>,
/// Message received from the peer.

View File

@ -1,21 +1,20 @@
//! Support for handling peer sessions.
pub use crate::message::PeerRequestSender;
use crate::{
session::{
active::ActiveSession,
handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
},
use crate::session::{
active::ActiveSession,
handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
},
NodeId,
};
use fnv::FnvHashMap;
use futures::{future::Either, io, FutureExt, StreamExt};
use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
Status, UnauthedEthStream,
error::EthStreamError,
HelloBuilder, HelloMessage, Status, StatusBuilder, UnauthedEthStream, UnauthedP2PStream,
};
use reth_primitives::{ForkFilter, Hardfork, PeerId};
use secp256k1::{SecretKey, SECP256K1};
use std::{
collections::HashMap,
@ -48,7 +47,13 @@ pub(crate) struct SessionManager {
/// The secret key used for authenticating sessions.
secret_key: SecretKey,
/// The node id of node
node_id: NodeId,
node_id: PeerId,
/// The `Status` message to send to peers.
status: Status,
/// THe `Hello` message to send to peers.
hello: HelloMessage,
/// The [`ForkFilter`] used to validate the peer's `Status` message.
fork_filter: ForkFilter,
/// Size of the command buffer per session.
session_command_buffer: usize,
/// All spawned session tasks.
@ -61,7 +66,7 @@ pub(crate) struct SessionManager {
/// session is authenticated, it can be moved to the `active_session` set.
pending_sessions: FnvHashMap<SessionId, PendingSessionHandle>,
/// All active sessions that are ready to exchange messages.
active_sessions: HashMap<NodeId, ActiveSessionHandle>,
active_sessions: HashMap<PeerId, ActiveSessionHandle>,
/// The original Sender half of the [`PendingSessionEvent`] channel.
///
/// When a new (pending) session is created, the corresponding [`PendingSessionHandle`] will
@ -87,12 +92,21 @@ impl SessionManager {
let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
let pk = secret_key.public_key(SECP256K1);
let node_id = NodeId::from_slice(&pk.serialize_uncompressed()[1..]);
let node_id = PeerId::from_slice(&pk.serialize_uncompressed()[1..]);
// TODO: make sure this is the right place to put these builders - maybe per-Network rather
// than per-Session?
let hello = HelloBuilder::new(node_id).build();
let status = StatusBuilder::default().build();
let fork_filter = Hardfork::Frontier.fork_filter();
Self {
next_id: 0,
secret_key,
node_id,
status,
hello,
fork_filter,
session_command_buffer: config.session_command_buffer,
spawned_tasks: Default::default(),
pending_sessions: Default::default(),
@ -139,6 +153,9 @@ impl SessionManager {
pending_events,
remote_addr,
self.secret_key,
self.hello.clone(),
self.status,
self.fork_filter.clone(),
));
let handle = PendingSessionHandle { disconnect_tx };
@ -147,7 +164,7 @@ impl SessionManager {
}
/// Starts a new pending session from the local node to the given remote node.
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_node_id: NodeId) {
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_node_id: PeerId) {
let session_id = self.next_id();
let (disconnect_tx, disconnect_rx) = oneshot::channel();
let pending_events = self.pending_sessions_tx.clone();
@ -158,6 +175,9 @@ impl SessionManager {
remote_addr,
remote_node_id,
self.secret_key,
self.hello.clone(),
self.status,
self.fork_filter.clone(),
));
let handle = PendingSessionHandle { disconnect_tx };
@ -168,7 +188,7 @@ impl SessionManager {
///
/// This will trigger the disconnect on the session task to gracefully terminate. The result
/// will be picked up by the receiver.
pub(crate) fn disconnect(&self, node: NodeId) {
pub(crate) fn disconnect(&self, node: PeerId) {
if let Some(session) = self.active_sessions.get(&node) {
session.disconnect();
}
@ -376,7 +396,7 @@ pub(crate) enum SessionEvent {
///
/// This session is now able to exchange data.
SessionEstablished {
node_id: NodeId,
node_id: PeerId,
remote_addr: SocketAddr,
capabilities: Arc<Capabilities>,
status: Status,
@ -384,30 +404,30 @@ pub(crate) enum SessionEvent {
},
/// A session received a valid message via RLPx.
ValidMessage {
node_id: NodeId,
node_id: PeerId,
/// Message received from the peer.
message: CapabilityMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidMessage {
node_id: NodeId,
node_id: PeerId,
/// Announced capabilities of the remote peer.
capabilities: Arc<Capabilities>,
/// Message received from the peer.
message: CapabilityMessage,
},
/// Closed an incoming pending session during authentication.
IncomingPendingSessionClosed { remote_addr: SocketAddr, error: Option<ECIESError> },
IncomingPendingSessionClosed { remote_addr: SocketAddr, error: Option<EthStreamError> },
/// Closed an outgoing pending session during authentication.
OutgoingPendingSessionClosed {
remote_addr: SocketAddr,
node_id: NodeId,
error: Option<ECIESError>,
node_id: PeerId,
error: Option<EthStreamError>,
},
/// Failed to establish a tcp stream
OutgoingConnectionError { remote_addr: SocketAddr, node_id: NodeId, error: io::Error },
OutgoingConnectionError { remote_addr: SocketAddr, node_id: PeerId, error: io::Error },
/// Active session was disconnected.
Disconnected { node_id: NodeId, remote_addr: SocketAddr },
Disconnected { node_id: PeerId, remote_addr: SocketAddr },
}
/// The error thrown when the max configured limit has been reached and no more connections are
@ -426,6 +446,9 @@ async fn start_pending_incoming_session(
events: mpsc::Sender<PendingSessionEvent>,
remote_addr: SocketAddr,
secret_key: SecretKey,
hello: HelloMessage,
status: Status,
fork_filter: ForkFilter,
) {
authenticate(
disconnect_rx,
@ -435,6 +458,9 @@ async fn start_pending_incoming_session(
remote_addr,
secret_key,
Direction::Incoming,
hello,
status,
fork_filter,
)
.await
}
@ -446,8 +472,11 @@ async fn start_pending_outbound_session(
events: mpsc::Sender<PendingSessionEvent>,
session_id: SessionId,
remote_addr: SocketAddr,
remote_node_id: NodeId,
remote_node_id: PeerId,
secret_key: SecretKey,
hello: HelloMessage,
status: Status,
fork_filter: ForkFilter,
) {
let stream = match TcpStream::connect(remote_addr).await {
Ok(stream) => stream,
@ -471,6 +500,9 @@ async fn start_pending_outbound_session(
remote_addr,
secret_key,
Direction::Outgoing(remote_node_id),
hello,
status,
fork_filter,
)
.await
}
@ -481,7 +513,7 @@ pub(crate) enum Direction {
/// Incoming connection.
Incoming,
/// Outgoing connection to a specific node.
Outgoing(NodeId),
Outgoing(PeerId),
}
async fn authenticate(
@ -492,6 +524,9 @@ async fn authenticate(
remote_addr: SocketAddr,
secret_key: SecretKey,
direction: Direction,
hello: HelloMessage,
status: Status,
fork_filter: ForkFilter,
) {
let stream = match direction {
Direction::Incoming => match ECIESStream::incoming(stream, secret_key).await {
@ -520,8 +555,17 @@ async fn authenticate(
}
};
let unauthed = UnauthedEthStream::new(stream);
let auth = authenticate_stream(unauthed, session_id, remote_addr, direction).boxed();
let unauthed = UnauthedP2PStream::new(stream);
let auth = authenticate_stream(
unauthed,
session_id,
remote_addr,
direction,
hello,
status,
fork_filter,
)
.boxed();
match futures::future::select(disconnect_rx, auth).await {
Either::Left((_, _)) => {
@ -544,10 +588,47 @@ async fn authenticate(
///
/// On Success return the authenticated stream as [`PendingSessionEvent`]
async fn authenticate_stream(
_stream: UnauthedEthStream<ECIESStream<TcpStream>>,
_session_id: SessionId,
_remote_addr: SocketAddr,
_direction: Direction,
stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
session_id: SessionId,
remote_addr: SocketAddr,
direction: Direction,
hello: HelloMessage,
status: Status,
fork_filter: ForkFilter,
) -> PendingSessionEvent {
todo!()
// conduct the p2p handshake and return the authenticated stream
let (p2p_stream, their_hello) = match stream.handshake(hello).await {
Ok(stream_res) => stream_res,
Err(err) => {
return PendingSessionEvent::Disconnected {
remote_addr,
session_id,
direction,
error: Some(err.into()),
}
}
};
// if the hello handshake was successful we can try status handshake
let eth_unauthed = UnauthedEthStream::new(p2p_stream);
let (eth_stream, their_status) = match eth_unauthed.handshake(status, fork_filter).await {
Ok(stream_res) => stream_res,
Err(err) => {
return PendingSessionEvent::Disconnected {
remote_addr,
session_id,
direction,
error: Some(err),
}
}
};
PendingSessionEvent::Established {
session_id,
remote_addr,
node_id: their_hello.id,
capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
status: their_status,
conn: eth_stream,
}
}

View File

@ -5,12 +5,11 @@ use crate::{
fetch::StateFetcher,
message::{PeerRequestSender, PeerResponse},
peers::{PeerAction, PeersManager},
NodeId,
};
use reth_eth_wire::{capability::Capabilities, Status};
use reth_interfaces::provider::BlockProvider;
use reth_primitives::H256;
use reth_primitives::{PeerId, H256};
use std::{
collections::{HashMap, VecDeque},
net::SocketAddr,
@ -37,7 +36,7 @@ use tracing::trace;
/// This type is also responsible for responding for received request.
pub struct NetworkState<C> {
/// All connected peers and their state.
connected_peers: HashMap<NodeId, ConnectedPeer>,
connected_peers: HashMap<PeerId, ConnectedPeer>,
/// Manages connections to peers.
peers_manager: PeersManager,
/// Buffered messages until polled.
@ -83,7 +82,7 @@ where
/// should be rejected.
pub(crate) fn on_session_activated(
&mut self,
peer: NodeId,
peer: PeerId,
capabilities: Arc<Capabilities>,
status: Status,
request_tx: PeerRequestSender,
@ -107,7 +106,7 @@ where
}
/// Event hook for a disconnected session for the peer.
pub(crate) fn on_session_closed(&mut self, peer: NodeId) {
pub(crate) fn on_session_closed(&mut self, peer: PeerId) {
self.connected_peers.remove(&peer);
self.state_fetcher.on_session_closed(&peer);
}
@ -149,7 +148,7 @@ where
}
/// Disconnect the session
fn on_session_disconnected(&mut self, peer: NodeId) {
fn on_session_disconnected(&mut self, peer: PeerId) {
self.connected_peers.remove(&peer);
}
@ -157,7 +156,7 @@ where
///
/// Caution: this will replace an already pending response. It's the responsibility of the
/// caller to select the peer.
fn handle_block_request(&mut self, peer: NodeId, request: BlockRequest) {
fn handle_block_request(&mut self, peer: PeerId, request: BlockRequest) {
if let Some(ref mut peer) = self.connected_peers.get_mut(&peer) {
let (request, response) = match request {
BlockRequest::GetBlockHeaders(request) => {
@ -192,7 +191,7 @@ where
}
/// Invoked when received a response from a connected peer.
fn on_eth_response(&mut self, peer: NodeId, resp: PeerResponseResult) -> Option<StateAction> {
fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult) -> Option<StateAction> {
match resp {
PeerResponseResult::BlockHeaders(res) => {
let outcome = self.state_fetcher.on_block_headers_response(peer, res)?;
@ -283,9 +282,9 @@ pub struct ConnectedPeer {
/// Message variants triggered by the [`State`]
pub enum StateAction {
/// Create a new connection to the given node.
Connect { remote_addr: SocketAddr, node_id: NodeId },
Connect { remote_addr: SocketAddr, node_id: PeerId },
/// Disconnect an existing connection
Disconnect { node_id: NodeId },
Disconnect { node_id: PeerId },
}
#[derive(Debug, thiserror::Error)]
@ -293,6 +292,6 @@ pub enum AddSessionError {
#[error("No capacity for new sessions")]
AtCapacity {
/// The peer of the session
peer: NodeId,
peer: PeerId,
},
}

View File

@ -2,12 +2,14 @@ use crate::{
listener::{ConnectionListener, ListenerEvent},
session::{SessionEvent, SessionId, SessionManager},
state::{AddSessionError, NetworkState, StateAction},
NodeId,
};
use futures::Stream;
use reth_ecies::ECIESError;
use reth_eth_wire::capability::{Capabilities, CapabilityMessage};
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
error::EthStreamError,
};
use reth_interfaces::provider::BlockProvider;
use reth_primitives::PeerId;
use std::{
io,
net::SocketAddr,
@ -55,7 +57,7 @@ where
}
/// Triggers a new outgoing connection to the given node
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: NodeId) {
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
self.sessions.dial_outbound(remote_addr, remote_id)
}
@ -191,13 +193,13 @@ pub enum SwarmEvent {
/// Events related to the actual network protocol.
CapabilityMessage {
/// The peer that sent the message
node_id: NodeId,
node_id: PeerId,
/// Message received from the peer
message: CapabilityMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidCapabilityMessage {
node_id: NodeId,
node_id: PeerId,
/// Announced capabilities of the remote peer.
capabilities: Arc<Capabilities>,
/// Message received from the peer.
@ -226,28 +228,28 @@ pub enum SwarmEvent {
remote_addr: SocketAddr,
},
SessionEstablished {
node_id: NodeId,
node_id: PeerId,
remote_addr: SocketAddr,
},
SessionClosed {
node_id: NodeId,
node_id: PeerId,
remote_addr: SocketAddr,
},
/// Closed an incoming pending session during authentication.
IncomingPendingSessionClosed {
remote_addr: SocketAddr,
error: Option<ECIESError>,
error: Option<EthStreamError>,
},
/// Closed an outgoing pending session during authentication.
OutgoingPendingSessionClosed {
remote_addr: SocketAddr,
node_id: NodeId,
error: Option<ECIESError>,
node_id: PeerId,
error: Option<EthStreamError>,
},
/// Failed to establish a tcp stream to the given address/node
OutgoingConnectionError {
remote_addr: SocketAddr,
node_id: NodeId,
node_id: PeerId,
error: io::Error,
},
}