feat(net): impl active session (#219)

* feat(net): impl active stream

* rename fields

* work on active

* feat(net): add disconnect function

* more work on active session

* feat(net): add broadcast message variants

* feat: impl session future

* misc: refactor

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Matthias Seitz
2022-11-21 11:31:03 +01:00
committed by GitHub
parent 4936d467c9
commit f7c6ae5858
9 changed files with 523 additions and 147 deletions

View File

@ -72,12 +72,12 @@ impl Discovery {
}
/// Manually adds an address to the set.
pub(crate) fn add_known_address(&mut self, node_id: PeerId, addr: SocketAddr) {
pub(crate) fn add_known_address(&mut self, peer_id: PeerId, addr: SocketAddr) {
self.on_discv4_update(TableUpdate::Added(NodeRecord {
address: addr.ip(),
tcp_port: addr.port(),
udp_port: addr.port(),
id: node_id,
id: peer_id,
}))
}

View File

@ -94,7 +94,7 @@ pub struct NetworkManager<C> {
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
num_active_peers: Arc<AtomicUsize>,
/// Local copy of the `PeerId` of the local node.
local_node_id: PeerId,
local_peer_id: PeerId,
}
// === impl NetworkManager ===
@ -130,7 +130,7 @@ where
let discovery = Discovery::new(discovery_addr, secret_key, discovery_v4_config).await?;
// need to retrieve the addr here since provided port could be `0`
let local_node_id = discovery.local_id();
let local_peer_id = discovery.local_id();
let sessions = SessionManager::new(secret_key, sessions_config);
let state = NetworkState::new(client, discovery, peers_manger, genesis_hash);
@ -144,7 +144,7 @@ where
Arc::clone(&num_active_peers),
Arc::clone(&listener_address),
to_manager_tx,
local_node_id,
local_peer_id,
peers_handle,
network_mode,
);
@ -158,7 +158,7 @@ where
event_listeners: Default::default(),
to_transactions: None,
num_active_peers,
local_node_id,
local_peer_id,
})
}
@ -177,11 +177,11 @@ where
/// Event hook for an unexpected message from the peer.
fn on_invalid_message(
&self,
node_id: PeerId,
peer_id: PeerId,
_capabilities: Arc<Capabilities>,
_message: CapabilityMessage,
) {
trace!(?node_id, target = "net", "received unexpected message");
trace!(?peer_id, target = "net", "received unexpected message");
// TODO: disconnect?
}
@ -344,11 +344,11 @@ where
while let Poll::Ready(Some(event)) = this.swarm.poll_next_unpin(cx) {
// handle event
match event {
SwarmEvent::ValidMessage { node_id, message } => {
this.on_peer_message(node_id, message)
SwarmEvent::ValidMessage { peer_id, message } => {
this.on_peer_message(peer_id, message)
}
SwarmEvent::InvalidCapabilityMessage { node_id, capabilities, message } => {
this.on_invalid_message(node_id, capabilities, message)
SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => {
this.on_invalid_message(peer_id, capabilities, message)
}
SwarmEvent::TcpListenerClosed { remote_addr } => {
trace!(?remote_addr, target = "net", "TCP listener closed.");
@ -362,12 +362,7 @@ where
SwarmEvent::OutgoingTcpConnection { remote_addr } => {
trace!(?remote_addr, target = "net", "Starting outbound connection.");
}
SwarmEvent::SessionEstablished {
node_id: peer_id,
remote_addr,
capabilities,
messages,
} => {
SwarmEvent::SessionEstablished { peer_id, remote_addr, capabilities, messages } => {
let total_active = this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
trace!(
?remote_addr,
@ -383,12 +378,13 @@ where
messages,
});
}
SwarmEvent::SessionClosed { node_id: peer_id, remote_addr } => {
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
let total_active = this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
trace!(
?remote_addr,
?peer_id,
?total_active,
?error,
target = "net",
"Session disconnected"
);

View File

@ -5,9 +5,10 @@
use futures::FutureExt;
use reth_eth_wire::{
capability::CapabilityMessage, BlockBodies, BlockBody, BlockHeaders, GetBlockBodies,
GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewBlockHashes,
NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, Transactions,
capability::CapabilityMessage, message::RequestPair, BlockBodies, BlockBody, BlockHeaders,
EthMessage, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
NewBlock, NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts,
Transactions,
};
use reth_interfaces::p2p::error::RequestResult;
use reth_primitives::{Header, PeerId, Receipt, TransactionSigned, H256};
@ -44,7 +45,7 @@ pub enum PeerMessage {
NewBlock(NewBlockMessage),
/// Broadcast transactions.
Transactions(Arc<Transactions>),
///
/// Send new pooled transactions
PooledTransactions(Arc<NewPooledTransactionHashes>),
/// All `eth` request variants.
EthRequest(PeerRequest),
@ -122,6 +123,34 @@ pub enum PeerRequest {
GetReceipts { request: GetReceipts, response: oneshot::Sender<RequestResult<Receipts>> },
}
// === impl PeerRequest ===
impl PeerRequest {
/// Returns the [`EthMessage`] for this type
pub fn create_request_message(&self, request_id: u64) -> EthMessage {
match self {
PeerRequest::GetBlockHeaders { request, .. } => {
EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request })
}
PeerRequest::GetBlockBodies { request, .. } => {
EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() })
}
PeerRequest::GetPooledTransactions { request, .. } => {
EthMessage::GetPooledTransactions(RequestPair {
request_id,
message: request.clone(),
})
}
PeerRequest::GetNodeData { request, .. } => {
EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() })
}
PeerRequest::GetReceipts { request, .. } => {
EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() })
}
}
}
}
/// Corresponding variant for [`PeerRequest`].
#[derive(Debug)]
pub enum PeerResponse {
@ -184,6 +213,38 @@ pub enum PeerResponseResult {
// === impl PeerResponseResult ===
impl PeerResponseResult {
/// Converts this response into an [`EthMessage`]
pub fn try_into_message(self, id: u64) -> RequestResult<EthMessage> {
macro_rules! to_message {
($response:ident, $item:ident, $request_id:ident) => {
match $response {
Ok(res) => {
let request = RequestPair { request_id: $request_id, message: $item(res) };
Ok(EthMessage::$item(request))
}
Err(err) => Err(err),
}
};
}
match self {
PeerResponseResult::BlockHeaders(resp) => {
to_message!(resp, BlockHeaders, id)
}
PeerResponseResult::BlockBodies(resp) => {
to_message!(resp, BlockBodies, id)
}
PeerResponseResult::PooledTransactions(resp) => {
to_message!(resp, PooledTransactions, id)
}
PeerResponseResult::NodeData(resp) => {
to_message!(resp, NodeData, id)
}
PeerResponseResult::Receipts(resp) => {
to_message!(resp, Receipts, id)
}
}
}
/// Returns whether this result is an error.
pub fn is_err(&self) -> bool {
match self {
@ -200,7 +261,7 @@ impl PeerResponseResult {
#[derive(Debug, Clone)]
pub struct PeerRequestSender {
/// id of the remote node.
pub(crate) peer: PeerId,
pub(crate) peer_id: PeerId,
/// The Sender half connected to a session.
pub(crate) to_session_tx: mpsc::Sender<PeerRequest>,
}

View File

@ -25,7 +25,7 @@ impl NetworkHandle {
num_active_peers: Arc<AtomicUsize>,
listener_address: Arc<Mutex<SocketAddr>>,
to_manager_tx: UnboundedSender<NetworkHandleMessage>,
local_node_id: PeerId,
local_peer_id: PeerId,
peers: PeersHandle,
network_mode: NetworkMode,
) -> Self {
@ -33,7 +33,7 @@ impl NetworkHandle {
num_active_peers,
to_manager_tx,
listener_address,
local_node_id,
local_peer_id,
peers,
network_mode,
};
@ -75,7 +75,7 @@ struct NetworkInner {
/// The local address that accepts incoming connections.
listener_address: Arc<Mutex<SocketAddr>>,
/// The identifier used by this node.
local_node_id: PeerId,
local_peer_id: PeerId,
/// Access to the all the nodes
peers: PeersHandle,
/// The mode of the network

View File

@ -1,112 +1,366 @@
//! Represents an established session.
use crate::{
message::PeerRequest,
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerResponse, PeerResponseResult},
session::{
handle::{ActiveSessionMessage, SessionCommand},
SessionId,
},
};
use fnv::FnvHashMap;
use futures::{stream::Fuse, Sink, Stream};
use pin_project::pin_project;
use futures::{stream::Fuse, SinkExt, StreamExt};
use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
EthStream, P2PStream,
capability::Capabilities,
error::{EthStreamError, HandshakeError},
message::{EthBroadcastMessage, RequestPair},
DisconnectReason, EthMessage, EthStream, P2PStream,
};
use reth_primitives::PeerId;
use std::{
collections::VecDeque,
future::Future,
net::SocketAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
task::{ready, Context, Poll},
time::Instant,
};
use tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
};
use tokio::{net::TcpStream, sync::mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, warn};
/// The type that advances an established session by listening for incoming messages (from local
/// node or read from connection) and emitting events back to the [`SessionHandler`].
#[pin_project]
/// node or read from connection) and emitting events back to the [`SessionsManager`].
///
/// It listens for
/// - incoming commands from the [`SessionsManager`]
/// - incoming requests via the request channel
/// - responses for handled ETH requests received from the remote peer.
pub(crate) struct ActiveSession {
/// Keeps track of request ids.
pub(crate) next_id: usize,
pub(crate) next_id: u64,
/// The underlying connection.
#[pin]
pub(crate) conn: EthStream<P2PStream<ECIESStream<TcpStream>>>,
/// Identifier of the node we're connected to.
pub(crate) remote_node_id: PeerId,
pub(crate) remote_peer_id: PeerId,
/// The address we're connected to.
pub(crate) remote_addr: SocketAddr,
/// All capabilities the peer announced
pub(crate) remote_capabilities: Arc<Capabilities>,
/// Internal identifier of this session
pub(crate) session_id: SessionId,
/// Incoming commands from the manager
#[pin]
pub(crate) commands_rx: ReceiverStream<SessionCommand>,
/// Sink to send messages to the [`SessionManager`].
pub(crate) to_session: mpsc::Sender<ActiveSessionMessage>,
/// Incoming request to send to delegate to the remote peer.
#[pin]
pub(crate) request_tx: Fuse<ReceiverStream<PeerRequest>>,
/// All requests currently in progress.
pub(crate) inflight_requests: FnvHashMap<usize, ()>,
/// Buffered messages that should be sent to the remote peer.
pub(crate) buffered_outgoing: VecDeque<CapabilityMessage>,
/// All requests sent to the remote peer we're waiting on a response
pub(crate) inflight_requests: FnvHashMap<u64, PeerRequest>,
/// All requests that were sent by the remote peer.
pub(crate) received_requests: Vec<ReceivedRequest>,
/// Buffered messages that should be handled and sent to the peer.
pub(crate) queued_outgoing: VecDeque<OutgoingMessage>,
}
impl ActiveSession {
/// Returns `true` if the session is currently in the process of disconnecting
fn is_disconnecting(&self) -> bool {
self.conn.inner().is_disconnecting()
}
/// Returns the next request id
fn next_id(&mut self) -> u64 {
let id = self.next_id;
self.next_id += 1;
id
}
/// Handle a message read from the connection.
///
/// Returns an error if the message is considered to be in violation of the protocol
fn on_incoming(&mut self, msg: EthMessage) -> Option<EthStreamError> {
/// A macro that handles an incoming request
/// This creates a new channel and tries to send the sender half to the session while
/// storing to receiver half internally so the pending response can be polled.
macro_rules! on_request {
($req:ident, $resp_item:ident, $req_item:ident) => {
let RequestPair { request_id, message: request } = $req;
let (tx, response) = oneshot::channel();
let received = ReceivedRequest {
request_id,
rx: PeerResponse::$resp_item { response },
received: Instant::now(),
};
if self
.try_emit_message(PeerMessage::EthRequest(PeerRequest::$req_item {
request,
response: tx,
}))
.is_ok()
{
self.received_requests.push(received);
}
};
}
/// Processes a response received from the peer
macro_rules! on_response {
($resp:ident, $item:ident) => {
let RequestPair { request_id, message } = $resp;
#[allow(clippy::collapsible_match)]
if let Some(resp) = self.inflight_requests.remove(&request_id) {
if let PeerRequest::$item { response, .. } = resp {
let _ = response.send(Ok(message));
} else {
// TODO handle bad response
}
} else {
// TODO handle unexpected response
}
};
}
match msg {
EthMessage::Status(_) => {
return Some(EthStreamError::HandshakeError(HandshakeError::StatusNotInHandshake))
}
EthMessage::NewBlockHashes(msg) => {
self.emit_message(PeerMessage::NewBlockHashes(Arc::new(msg)));
}
EthMessage::NewBlock(msg) => {
let block =
NewBlockMessage { hash: msg.block.header.hash_slow(), block: Arc::new(*msg) };
self.emit_message(PeerMessage::NewBlock(block));
}
EthMessage::Transactions(msg) => {
self.emit_message(PeerMessage::Transactions(Arc::new(msg)));
}
EthMessage::NewPooledTransactionHashes(msg) => {
self.emit_message(PeerMessage::PooledTransactions(Arc::new(msg)));
}
EthMessage::GetBlockHeaders(req) => {
on_request!(req, BlockHeaders, GetBlockHeaders);
}
EthMessage::BlockHeaders(resp) => {
on_response!(resp, GetBlockHeaders);
}
EthMessage::GetBlockBodies(req) => {
on_request!(req, BlockBodies, GetBlockBodies);
}
EthMessage::BlockBodies(resp) => {
on_response!(resp, GetBlockBodies);
}
EthMessage::GetPooledTransactions(req) => {
on_request!(req, PooledTransactions, GetPooledTransactions);
}
EthMessage::PooledTransactions(resp) => {
on_response!(resp, GetPooledTransactions);
}
EthMessage::GetNodeData(req) => {
on_request!(req, NodeData, GetNodeData);
}
EthMessage::NodeData(resp) => {
on_response!(resp, GetNodeData);
}
EthMessage::GetReceipts(req) => {
on_request!(req, Receipts, GetReceipts);
}
EthMessage::Receipts(resp) => {
on_response!(resp, GetReceipts);
}
};
None
}
/// Handle an incoming peer request.
fn on_peer_request(&mut self, req: PeerRequest) {
let request_id = self.next_id();
let msg = req.create_request_message(request_id);
self.queued_outgoing.push_back(msg.into());
self.inflight_requests.insert(request_id, req);
}
/// Handle a message received from the internal network
fn on_peer_message(&mut self, msg: PeerMessage) {
match msg {
PeerMessage::NewBlockHashes(msg) => {
self.queued_outgoing.push_back(EthBroadcastMessage::NewBlockHashes(msg).into());
}
PeerMessage::NewBlock(msg) => {
self.queued_outgoing.push_back(EthBroadcastMessage::NewBlock(msg.block).into());
}
PeerMessage::Transactions(msg) => {
self.queued_outgoing.push_back(EthBroadcastMessage::Transactions(msg).into());
}
PeerMessage::PooledTransactions(msg) => {
self.queued_outgoing
.push_back(EthBroadcastMessage::NewPooledTransactionHashes(msg).into());
}
PeerMessage::EthRequest(req) => {
self.on_peer_request(req);
}
PeerMessage::Other(_) => {}
}
}
/// Handle a Response to the peer
fn handle_outgoing_response(&mut self, id: u64, resp: PeerResponseResult) {
match resp.try_into_message(id) {
Ok(msg) => {
self.queued_outgoing.push_back(msg.into());
}
Err(err) => {
error!(?err, target = "net", "Failed to respond to received request");
}
}
}
/// Send a message back to the [`SessionsManager`]
fn emit_message(&self, message: PeerMessage) {
let _ = self.try_emit_message(message).map_err(|err| {
warn!(
%err,
target = "net",
"dropping incoming message",
);
});
}
/// Send a message back to the [`SessionsManager`]
fn try_emit_message(
&self,
message: PeerMessage,
) -> Result<(), mpsc::error::TrySendError<ActiveSessionMessage>> {
self.to_session
.try_send(ActiveSessionMessage::ValidMessage { peer_id: self.remote_peer_id, message })
}
/// Report back that this session has been closed.
fn disconnect(&self) {
// NOTE: we clone here so there's enough capacity to deliver this message
let _ = self.to_session.clone().try_send(ActiveSessionMessage::Disconnected {
peer_id: self.remote_peer_id,
remote_addr: self.remote_addr,
});
}
/// Report back that this session has been closed due to an error
fn close_on_error(&self, error: EthStreamError) {
// NOTE: we clone here so there's enough capacity to deliver this message
let _ = self.to_session.clone().try_send(ActiveSessionMessage::ClosedOnConnectionError {
peer_id: self.remote_peer_id,
remote_addr: self.remote_addr,
error,
});
}
}
impl Future for ActiveSession {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let this = self.get_mut();
if this.is_disconnecting() {
// try to close the flush out the remaining Disconnect message
let _ = ready!(this.conn.poll_close_unpin(cx));
this.disconnect();
return Poll::Ready(())
}
loop {
let mut progress = false;
// we prioritize incoming messages
// we prioritize incoming commands sent from the session manager
loop {
match this.commands_rx.as_mut().poll_next(cx) {
match this.commands_rx.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
// this is only possible when the manager was dropped, in which case we also
// terminate this session
return Poll::Ready(())
}
Poll::Ready(Some(_cmd)) => {
Poll::Ready(Some(cmd)) => {
progress = true;
// TODO handle command
continue
match cmd {
SessionCommand::Disconnect { reason } => {
let reason =
reason.unwrap_or(DisconnectReason::DisconnectRequested);
this.conn.inner_mut().start_disconnect(reason);
}
SessionCommand::Message(msg) => {
this.on_peer_message(msg);
}
}
}
}
}
while let Poll::Ready(Some(_req)) = this.request_tx.as_mut().poll_next(cx) {
while let Poll::Ready(Some(req)) = this.request_tx.poll_next_unpin(cx) {
progress = true;
// TODO handle request
this.on_peer_request(req);
}
// send and flush
while this.conn.as_mut().poll_ready(cx).is_ready() {
if let Some(_msg) = this.buffered_outgoing.pop_front() {
// Advance all active requests.
// We remove each request one by one and add them back.
for idx in (0..this.received_requests.len()).rev() {
let mut req = this.received_requests.swap_remove(idx);
match req.rx.poll(cx) {
Poll::Pending => {
// not ready yet
this.received_requests.push(req);
}
Poll::Ready(Ok(resp)) => {
this.handle_outgoing_response(req.request_id, resp);
}
Poll::Ready(Err(_)) => {
// ignore on error
}
}
}
// Send messages by advancing the sink and queuing in buffered messages
while this.conn.poll_ready_unpin(cx).is_ready() {
if let Some(msg) = this.queued_outgoing.pop_front() {
progress = true;
// TODO encode message and start send
let res = match msg {
OutgoingMessage::Eth(msg) => this.conn.start_send_unpin(msg),
OutgoingMessage::Broadcast(msg) => this.conn.start_send_broadcast(msg),
};
if let Err(_err) = res {
return Poll::Ready(())
}
} else {
break
}
}
loop {
match this.conn.as_mut().poll_next(cx) {
match this.conn.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
// disconnected
}
Poll::Ready(Some(_msg)) => {
Poll::Ready(None) => return Poll::Pending,
Poll::Ready(Some(res)) => {
progress = true;
match res {
Ok(msg) => {
// decode and handle message
continue
if let Some(err) = this.on_incoming(msg) {
this.close_on_error(err);
return Poll::Ready(())
}
}
Err(err) => {
this.close_on_error(err);
return Poll::Ready(())
}
}
}
}
}
@ -117,3 +371,33 @@ impl Future for ActiveSession {
}
}
}
/// Tracks a request received from the peer
pub(crate) struct ReceivedRequest {
/// Protocol Identifier
request_id: u64,
/// Receiver half of the channel that's supposed to receive the proper response.
rx: PeerResponse,
/// Timestamp when we read this msg from the wire.
received: Instant,
}
/// Outgoing messages that can be sent over the wire.
pub(crate) enum OutgoingMessage {
/// A message that is owned.
Eth(EthMessage),
/// A message that may be shared by multiple sessions.
Broadcast(EthBroadcastMessage),
}
impl From<EthMessage> for OutgoingMessage {
fn from(value: EthMessage) -> Self {
OutgoingMessage::Eth(value)
}
}
impl From<EthBroadcastMessage> for OutgoingMessage {
fn from(value: EthBroadcastMessage) -> Self {
OutgoingMessage::Broadcast(value)
}
}

View File

@ -68,7 +68,7 @@ pub(crate) enum PendingSessionEvent {
session_id: SessionId,
remote_addr: SocketAddr,
/// The remote node's public key
node_id: PeerId,
peer_id: PeerId,
capabilities: Arc<Capabilities>,
status: Status,
conn: EthStream<P2PStream<ECIESStream<TcpStream>>>,
@ -84,7 +84,7 @@ pub(crate) enum PendingSessionEvent {
OutgoingConnectionError {
remote_addr: SocketAddr,
session_id: SessionId,
node_id: PeerId,
peer_id: PeerId,
error: io::Error,
},
/// Thrown when authentication via Ecies failed.
@ -107,19 +107,26 @@ pub(crate) enum SessionCommand {
/// [`SessionManager`](crate::session::SessionManager)
#[derive(Debug)]
pub(crate) enum ActiveSessionMessage {
/// Session disconnected.
Closed { node_id: PeerId, remote_addr: SocketAddr },
/// Session was gracefully disconnected.
Disconnected { peer_id: PeerId, remote_addr: SocketAddr },
/// Session was closed due an error
ClosedOnConnectionError {
peer_id: PeerId,
remote_addr: SocketAddr,
/// The error that caused the session to close
error: EthStreamError,
},
/// A session received a valid message via RLPx.
ValidMessage {
/// Identifier of the remote peer.
node_id: PeerId,
peer_id: PeerId,
/// Message received from the peer.
message: PeerMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidMessage {
/// Identifier of the remote peer.
node_id: PeerId,
peer_id: PeerId,
/// Announced capabilities of the remote peer.
capabilities: Arc<Capabilities>,
/// Message received from the peer.

View File

@ -52,7 +52,7 @@ pub(crate) struct SessionManager {
/// The secret key used for authenticating sessions.
secret_key: SecretKey,
/// The node id of node
node_id: PeerId,
peer_id: PeerId,
/// The `Status` message to send to peers.
status: Status,
/// THe `Hello` message to send to peers.
@ -97,18 +97,18 @@ impl SessionManager {
let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
let pk = secret_key.public_key(SECP256K1);
let node_id = PeerId::from_slice(&pk.serialize_uncompressed()[1..]);
let peer_id = PeerId::from_slice(&pk.serialize_uncompressed()[1..]);
// TODO: make sure this is the right place to put these builders - maybe per-Network rather
// than per-Session?
let hello = HelloBuilder::new(node_id).build();
let hello = HelloBuilder::new(peer_id).build();
let status = StatusBuilder::default().build();
let fork_filter = Hardfork::Frontier.fork_filter();
Self {
next_id: 0,
secret_key,
node_id,
peer_id,
status,
hello,
fork_filter,
@ -169,7 +169,7 @@ impl SessionManager {
}
/// Starts a new pending session from the local node to the given remote node.
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_node_id: PeerId) {
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
let session_id = self.next_id();
let (disconnect_tx, disconnect_rx) = oneshot::channel();
let pending_events = self.pending_sessions_tx.clone();
@ -178,7 +178,7 @@ impl SessionManager {
pending_events,
session_id,
remote_addr,
remote_node_id,
remote_peer_id,
self.secret_key,
self.hello.clone(),
self.status,
@ -218,17 +218,33 @@ impl SessionManager {
}
Poll::Ready(Some(event)) => {
return match event {
ActiveSessionMessage::Closed { node_id, remote_addr } => {
trace!(?node_id, target = "net::session", "closed active session.");
let _ = self.active_sessions.remove(&node_id);
Poll::Ready(SessionEvent::Disconnected { node_id, remote_addr })
ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
trace!(
?peer_id,
target = "net::session",
"gracefully disconnected active session."
);
let _ = self.active_sessions.remove(&peer_id);
Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
}
ActiveSessionMessage::ValidMessage { node_id, message } => {
// TODO: since all messages are known they should be decoded in the session
Poll::Ready(SessionEvent::ValidMessage { node_id, message })
ActiveSessionMessage::ClosedOnConnectionError {
peer_id,
remote_addr,
error,
} => {
trace!(?peer_id, ?error, target = "net::session", "closed session.");
let _ = self.active_sessions.remove(&peer_id);
Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
remote_addr,
peer_id,
error,
})
}
ActiveSessionMessage::InvalidMessage { node_id, capabilities, message } => {
Poll::Ready(SessionEvent::InvalidMessage { node_id, message, capabilities })
ActiveSessionMessage::ValidMessage { peer_id, message } => {
Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
}
ActiveSessionMessage::InvalidMessage { peer_id, capabilities, message } => {
Poll::Ready(SessionEvent::InvalidMessage { peer_id, message, capabilities })
}
}
}
@ -253,7 +269,7 @@ impl SessionManager {
PendingSessionEvent::Established {
session_id,
remote_addr,
node_id,
peer_id,
capabilities,
conn,
status,
@ -266,11 +282,12 @@ impl SessionManager {
let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
let messages = PeerRequestSender { peer: node_id, to_session_tx };
let messages = PeerRequestSender { peer_id, to_session_tx };
let session = ActiveSession {
next_id: 0,
remote_node_id: node_id,
remote_peer_id: peer_id,
remote_addr,
remote_capabilities: Arc::clone(&capabilities),
session_id,
commands_rx: ReceiverStream::new(commands_rx),
@ -278,23 +295,24 @@ impl SessionManager {
request_tx: ReceiverStream::new(messages_rx).fuse(),
inflight_requests: Default::default(),
conn,
buffered_outgoing: Default::default(),
queued_outgoing: Default::default(),
received_requests: Default::default(),
};
self.spawn(session);
let handle = ActiveSessionHandle {
session_id,
remote_id: node_id,
remote_id: peer_id,
established: Instant::now(),
capabilities: Arc::clone(&capabilities),
commands_to_session,
};
self.active_sessions.insert(node_id, handle);
self.active_sessions.insert(peer_id, handle);
return Poll::Ready(SessionEvent::SessionEstablished {
node_id,
peer_id,
remote_addr,
capabilities,
status,
@ -316,10 +334,10 @@ impl SessionManager {
error,
})
}
Direction::Outgoing(node_id) => {
Direction::Outgoing(peer_id) => {
Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
remote_addr,
node_id,
peer_id,
error,
})
}
@ -328,14 +346,14 @@ impl SessionManager {
PendingSessionEvent::OutgoingConnectionError {
remote_addr,
session_id,
node_id,
peer_id,
error,
} => {
trace!(
?error,
?session_id,
?remote_addr,
?node_id,
?peer_id,
target = "net::session",
"connection refused"
);
@ -408,7 +426,7 @@ pub(crate) enum SessionEvent {
///
/// This session is now able to exchange data.
SessionEstablished {
node_id: PeerId,
peer_id: PeerId,
remote_addr: SocketAddr,
capabilities: Arc<Capabilities>,
status: Status,
@ -416,13 +434,13 @@ pub(crate) enum SessionEvent {
},
/// A session received a valid message via RLPx.
ValidMessage {
node_id: PeerId,
peer_id: PeerId,
/// Message received from the peer.
message: PeerMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidMessage {
node_id: PeerId,
peer_id: PeerId,
/// Announced capabilities of the remote peer.
capabilities: Arc<Capabilities>,
/// Message received from the peer.
@ -433,13 +451,22 @@ pub(crate) enum SessionEvent {
/// Closed an outgoing pending session during authentication.
OutgoingPendingSessionClosed {
remote_addr: SocketAddr,
node_id: PeerId,
peer_id: PeerId,
error: Option<EthStreamError>,
},
/// Failed to establish a tcp stream
OutgoingConnectionError { remote_addr: SocketAddr, node_id: PeerId, error: io::Error },
/// Active session was disconnected.
Disconnected { node_id: PeerId, remote_addr: SocketAddr },
OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error },
/// Session was closed due to an error
SessionClosedOnConnectionError {
/// The id of the remote peer.
peer_id: PeerId,
/// The socket we were connected to.
remote_addr: SocketAddr,
/// The error that caused the session to close
error: EthStreamError,
},
/// Active session was gracefully disconnected.
Disconnected { peer_id: PeerId, remote_addr: SocketAddr },
}
/// The error thrown when the max configured limit has been reached and no more connections are
@ -478,13 +505,13 @@ async fn start_pending_incoming_session(
}
/// Starts the authentication process for a connection initiated by a remote peer.
#[instrument(skip_all, fields(%remote_addr, node_id), target = "net")]
#[instrument(skip_all, fields(%remote_addr, peer_id), target = "net")]
async fn start_pending_outbound_session(
disconnect_rx: oneshot::Receiver<()>,
events: mpsc::Sender<PendingSessionEvent>,
session_id: SessionId,
remote_addr: SocketAddr,
remote_node_id: PeerId,
remote_peer_id: PeerId,
secret_key: SecretKey,
hello: HelloMessage,
status: Status,
@ -497,7 +524,7 @@ async fn start_pending_outbound_session(
.send(PendingSessionEvent::OutgoingConnectionError {
remote_addr,
session_id,
node_id: remote_node_id,
peer_id: remote_peer_id,
error,
})
.await;
@ -511,7 +538,7 @@ async fn start_pending_outbound_session(
session_id,
remote_addr,
secret_key,
Direction::Outgoing(remote_node_id),
Direction::Outgoing(remote_peer_id),
hello,
status,
fork_filter,
@ -550,8 +577,8 @@ async fn authenticate(
return
}
},
Direction::Outgoing(remote_node_id) => {
match ECIESStream::connect(stream, secret_key, remote_node_id).await {
Direction::Outgoing(remote_peer_id) => {
match ECIESStream::connect(stream, secret_key, remote_peer_id).await {
Ok(stream) => stream,
Err(error) => {
let _ = events
@ -638,7 +665,7 @@ async fn authenticate_stream(
PendingSessionEvent::Established {
session_id,
remote_addr,
node_id: their_hello.id,
peer_id: their_hello.id,
capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
status: their_status,
conn: eth_stream,

View File

@ -223,8 +223,7 @@ where
fn on_peer_action(&mut self, action: PeerAction) {
match action {
PeerAction::Connect { peer_id, remote_addr } => {
self.queued_messages
.push_back(StateAction::Connect { node_id: peer_id, remote_addr });
self.queued_messages.push_back(StateAction::Connect { peer_id, remote_addr });
}
PeerAction::Disconnect { peer_id, reason } => {
self.state_fetcher.on_pending_disconnect(&peer_id);
@ -388,7 +387,7 @@ pub enum StateAction {
hashes: Arc<NewBlockHashes>,
},
/// Create a new connection to the given node.
Connect { remote_addr: SocketAddr, node_id: PeerId },
Connect { remote_addr: SocketAddr, peer_id: PeerId },
/// Disconnect an existing connection
Disconnect {
peer_id: PeerId,

View File

@ -72,19 +72,19 @@ where
fn on_session_event(&mut self, event: SessionEvent) -> Option<SwarmEvent> {
match event {
SessionEvent::SessionEstablished {
node_id,
peer_id,
remote_addr,
capabilities,
status,
messages,
} => match self.state.on_session_activated(
node_id,
peer_id,
capabilities.clone(),
status,
messages.clone(),
) {
Ok(_) => Some(SwarmEvent::SessionEstablished {
node_id,
peer_id,
remote_addr,
capabilities,
messages,
@ -98,24 +98,31 @@ where
None
}
},
SessionEvent::ValidMessage { node_id, message } => {
Some(SwarmEvent::ValidMessage { node_id, message })
SessionEvent::ValidMessage { peer_id, message } => {
Some(SwarmEvent::ValidMessage { peer_id, message })
}
SessionEvent::InvalidMessage { node_id, capabilities, message } => {
Some(SwarmEvent::InvalidCapabilityMessage { node_id, capabilities, message })
SessionEvent::InvalidMessage { peer_id, capabilities, message } => {
Some(SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message })
}
SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => {
Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error })
}
SessionEvent::OutgoingPendingSessionClosed { remote_addr, node_id, error } => {
Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, node_id, error })
SessionEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error })
}
SessionEvent::Disconnected { node_id, remote_addr } => {
self.state.on_session_closed(node_id);
Some(SwarmEvent::SessionClosed { node_id, remote_addr })
SessionEvent::Disconnected { peer_id, remote_addr } => {
self.state.on_session_closed(peer_id);
Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: None })
}
SessionEvent::OutgoingConnectionError { remote_addr, node_id, error } => {
Some(SwarmEvent::OutgoingConnectionError { node_id, remote_addr, error })
SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => {
self.state.on_session_closed(peer_id);
// TODO(mattsse): reputation change on error
Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) })
}
SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error })
}
}
}
@ -146,11 +153,11 @@ where
/// Hook for actions pulled from the state
fn on_state_action(&mut self, event: StateAction) -> Option<SwarmEvent> {
match event {
StateAction::Connect { remote_addr, node_id } => {
self.sessions.dial_outbound(remote_addr, node_id);
StateAction::Connect { remote_addr, peer_id } => {
self.sessions.dial_outbound(remote_addr, peer_id);
}
StateAction::Disconnect { peer_id: node_id, reason } => {
self.sessions.disconnect(node_id, reason);
StateAction::Disconnect { peer_id, reason } => {
self.sessions.disconnect(peer_id, reason);
}
StateAction::NewBlock { peer_id, block: msg } => {
let msg = PeerMessage::NewBlock(msg);
@ -220,13 +227,13 @@ pub enum SwarmEvent {
/// Events related to the actual network protocol.
ValidMessage {
/// The peer that sent the message
node_id: PeerId,
peer_id: PeerId,
/// Message received from the peer
message: PeerMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidCapabilityMessage {
node_id: PeerId,
peer_id: PeerId,
/// Announced capabilities of the remote peer.
capabilities: Arc<Capabilities>,
/// Message received from the peer.
@ -255,30 +262,25 @@ pub enum SwarmEvent {
remote_addr: SocketAddr,
},
SessionEstablished {
node_id: PeerId,
peer_id: PeerId,
remote_addr: SocketAddr,
capabilities: Arc<Capabilities>,
messages: PeerRequestSender,
},
SessionClosed {
node_id: PeerId,
remote_addr: SocketAddr,
},
/// Closed an incoming pending session during authentication.
IncomingPendingSessionClosed {
peer_id: PeerId,
remote_addr: SocketAddr,
/// Whether the session was closed due to an error
error: Option<EthStreamError>,
},
/// Closed an incoming pending session during authentication.
IncomingPendingSessionClosed { remote_addr: SocketAddr, error: Option<EthStreamError> },
/// Closed an outgoing pending session during authentication.
OutgoingPendingSessionClosed {
remote_addr: SocketAddr,
node_id: PeerId,
peer_id: PeerId,
error: Option<EthStreamError>,
},
/// Failed to establish a tcp stream to the given address/node
OutgoingConnectionError {
remote_addr: SocketAddr,
node_id: PeerId,
error: io::Error,
},
OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error },
}