From f7c6ae585881d326a9cc2e10acbfe26726b68918 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 21 Nov 2022 11:31:03 +0100 Subject: [PATCH] feat(net): impl active session (#219) * feat(net): impl active stream * rename fields * work on active * feat(net): add disconnect function * more work on active session * feat(net): add broadcast message variants * feat: impl session future * misc: refactor Co-authored-by: Georgios Konstantopoulos --- crates/net/network/src/discovery.rs | 4 +- crates/net/network/src/manager.rs | 30 +- crates/net/network/src/message.rs | 71 ++++- crates/net/network/src/network.rs | 6 +- crates/net/network/src/session/active.rs | 362 ++++++++++++++++++++--- crates/net/network/src/session/handle.rs | 19 +- crates/net/network/src/session/mod.rs | 107 ++++--- crates/net/network/src/state.rs | 5 +- crates/net/network/src/swarm.rs | 66 +++-- 9 files changed, 523 insertions(+), 147 deletions(-) diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index b2eb288f5..b8a93f294 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -72,12 +72,12 @@ impl Discovery { } /// Manually adds an address to the set. - pub(crate) fn add_known_address(&mut self, node_id: PeerId, addr: SocketAddr) { + pub(crate) fn add_known_address(&mut self, peer_id: PeerId, addr: SocketAddr) { self.on_discv4_update(TableUpdate::Added(NodeRecord { address: addr.ip(), tcp_port: addr.port(), udp_port: addr.port(), - id: node_id, + id: peer_id, })) } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 7ea95e9ff..fd0cba7ab 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -94,7 +94,7 @@ pub struct NetworkManager { /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. num_active_peers: Arc, /// Local copy of the `PeerId` of the local node. - local_node_id: PeerId, + local_peer_id: PeerId, } // === impl NetworkManager === @@ -130,7 +130,7 @@ where let discovery = Discovery::new(discovery_addr, secret_key, discovery_v4_config).await?; // need to retrieve the addr here since provided port could be `0` - let local_node_id = discovery.local_id(); + let local_peer_id = discovery.local_id(); let sessions = SessionManager::new(secret_key, sessions_config); let state = NetworkState::new(client, discovery, peers_manger, genesis_hash); @@ -144,7 +144,7 @@ where Arc::clone(&num_active_peers), Arc::clone(&listener_address), to_manager_tx, - local_node_id, + local_peer_id, peers_handle, network_mode, ); @@ -158,7 +158,7 @@ where event_listeners: Default::default(), to_transactions: None, num_active_peers, - local_node_id, + local_peer_id, }) } @@ -177,11 +177,11 @@ where /// Event hook for an unexpected message from the peer. fn on_invalid_message( &self, - node_id: PeerId, + peer_id: PeerId, _capabilities: Arc, _message: CapabilityMessage, ) { - trace!(?node_id, target = "net", "received unexpected message"); + trace!(?peer_id, target = "net", "received unexpected message"); // TODO: disconnect? } @@ -344,11 +344,11 @@ where while let Poll::Ready(Some(event)) = this.swarm.poll_next_unpin(cx) { // handle event match event { - SwarmEvent::ValidMessage { node_id, message } => { - this.on_peer_message(node_id, message) + SwarmEvent::ValidMessage { peer_id, message } => { + this.on_peer_message(peer_id, message) } - SwarmEvent::InvalidCapabilityMessage { node_id, capabilities, message } => { - this.on_invalid_message(node_id, capabilities, message) + SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => { + this.on_invalid_message(peer_id, capabilities, message) } SwarmEvent::TcpListenerClosed { remote_addr } => { trace!(?remote_addr, target = "net", "TCP listener closed."); @@ -362,12 +362,7 @@ where SwarmEvent::OutgoingTcpConnection { remote_addr } => { trace!(?remote_addr, target = "net", "Starting outbound connection."); } - SwarmEvent::SessionEstablished { - node_id: peer_id, - remote_addr, - capabilities, - messages, - } => { + SwarmEvent::SessionEstablished { peer_id, remote_addr, capabilities, messages } => { let total_active = this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; trace!( ?remote_addr, @@ -383,12 +378,13 @@ where messages, }); } - SwarmEvent::SessionClosed { node_id: peer_id, remote_addr } => { + SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { let total_active = this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1; trace!( ?remote_addr, ?peer_id, ?total_active, + ?error, target = "net", "Session disconnected" ); diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 7613cac77..e659b8054 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -5,9 +5,10 @@ use futures::FutureExt; use reth_eth_wire::{ - capability::CapabilityMessage, BlockBodies, BlockBody, BlockHeaders, GetBlockBodies, - GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewBlockHashes, - NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, Transactions, + capability::CapabilityMessage, message::RequestPair, BlockBodies, BlockBody, BlockHeaders, + EthMessage, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, + NewBlock, NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, + Transactions, }; use reth_interfaces::p2p::error::RequestResult; use reth_primitives::{Header, PeerId, Receipt, TransactionSigned, H256}; @@ -44,7 +45,7 @@ pub enum PeerMessage { NewBlock(NewBlockMessage), /// Broadcast transactions. Transactions(Arc), - /// + /// Send new pooled transactions PooledTransactions(Arc), /// All `eth` request variants. EthRequest(PeerRequest), @@ -122,6 +123,34 @@ pub enum PeerRequest { GetReceipts { request: GetReceipts, response: oneshot::Sender> }, } +// === impl PeerRequest === + +impl PeerRequest { + /// Returns the [`EthMessage`] for this type + pub fn create_request_message(&self, request_id: u64) -> EthMessage { + match self { + PeerRequest::GetBlockHeaders { request, .. } => { + EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request }) + } + PeerRequest::GetBlockBodies { request, .. } => { + EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() }) + } + PeerRequest::GetPooledTransactions { request, .. } => { + EthMessage::GetPooledTransactions(RequestPair { + request_id, + message: request.clone(), + }) + } + PeerRequest::GetNodeData { request, .. } => { + EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() }) + } + PeerRequest::GetReceipts { request, .. } => { + EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() }) + } + } + } +} + /// Corresponding variant for [`PeerRequest`]. #[derive(Debug)] pub enum PeerResponse { @@ -184,6 +213,38 @@ pub enum PeerResponseResult { // === impl PeerResponseResult === impl PeerResponseResult { + /// Converts this response into an [`EthMessage`] + pub fn try_into_message(self, id: u64) -> RequestResult { + macro_rules! to_message { + ($response:ident, $item:ident, $request_id:ident) => { + match $response { + Ok(res) => { + let request = RequestPair { request_id: $request_id, message: $item(res) }; + Ok(EthMessage::$item(request)) + } + Err(err) => Err(err), + } + }; + } + match self { + PeerResponseResult::BlockHeaders(resp) => { + to_message!(resp, BlockHeaders, id) + } + PeerResponseResult::BlockBodies(resp) => { + to_message!(resp, BlockBodies, id) + } + PeerResponseResult::PooledTransactions(resp) => { + to_message!(resp, PooledTransactions, id) + } + PeerResponseResult::NodeData(resp) => { + to_message!(resp, NodeData, id) + } + PeerResponseResult::Receipts(resp) => { + to_message!(resp, Receipts, id) + } + } + } + /// Returns whether this result is an error. pub fn is_err(&self) -> bool { match self { @@ -200,7 +261,7 @@ impl PeerResponseResult { #[derive(Debug, Clone)] pub struct PeerRequestSender { /// id of the remote node. - pub(crate) peer: PeerId, + pub(crate) peer_id: PeerId, /// The Sender half connected to a session. pub(crate) to_session_tx: mpsc::Sender, } diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 9ae672015..503d3e967 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -25,7 +25,7 @@ impl NetworkHandle { num_active_peers: Arc, listener_address: Arc>, to_manager_tx: UnboundedSender, - local_node_id: PeerId, + local_peer_id: PeerId, peers: PeersHandle, network_mode: NetworkMode, ) -> Self { @@ -33,7 +33,7 @@ impl NetworkHandle { num_active_peers, to_manager_tx, listener_address, - local_node_id, + local_peer_id, peers, network_mode, }; @@ -75,7 +75,7 @@ struct NetworkInner { /// The local address that accepts incoming connections. listener_address: Arc>, /// The identifier used by this node. - local_node_id: PeerId, + local_peer_id: PeerId, /// Access to the all the nodes peers: PeersHandle, /// The mode of the network diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index a8a1519dd..34816c7b5 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -1,112 +1,366 @@ //! Represents an established session. use crate::{ - message::PeerRequest, + message::{NewBlockMessage, PeerMessage, PeerRequest, PeerResponse, PeerResponseResult}, session::{ handle::{ActiveSessionMessage, SessionCommand}, SessionId, }, }; use fnv::FnvHashMap; -use futures::{stream::Fuse, Sink, Stream}; -use pin_project::pin_project; +use futures::{stream::Fuse, SinkExt, StreamExt}; use reth_ecies::stream::ECIESStream; use reth_eth_wire::{ - capability::{Capabilities, CapabilityMessage}, - EthStream, P2PStream, + capability::Capabilities, + error::{EthStreamError, HandshakeError}, + message::{EthBroadcastMessage, RequestPair}, + DisconnectReason, EthMessage, EthStream, P2PStream, }; + use reth_primitives::PeerId; use std::{ collections::VecDeque, future::Future, + net::SocketAddr, pin::Pin, sync::Arc, - task::{Context, Poll}, + task::{ready, Context, Poll}, + time::Instant, +}; +use tokio::{ + net::TcpStream, + sync::{mpsc, oneshot}, }; -use tokio::{net::TcpStream, sync::mpsc}; use tokio_stream::wrappers::ReceiverStream; +use tracing::{error, warn}; /// The type that advances an established session by listening for incoming messages (from local -/// node or read from connection) and emitting events back to the [`SessionHandler`]. -#[pin_project] +/// node or read from connection) and emitting events back to the [`SessionsManager`]. +/// +/// It listens for +/// - incoming commands from the [`SessionsManager`] +/// - incoming requests via the request channel +/// - responses for handled ETH requests received from the remote peer. pub(crate) struct ActiveSession { /// Keeps track of request ids. - pub(crate) next_id: usize, + pub(crate) next_id: u64, /// The underlying connection. - #[pin] pub(crate) conn: EthStream>>, /// Identifier of the node we're connected to. - pub(crate) remote_node_id: PeerId, + pub(crate) remote_peer_id: PeerId, + /// The address we're connected to. + pub(crate) remote_addr: SocketAddr, /// All capabilities the peer announced pub(crate) remote_capabilities: Arc, /// Internal identifier of this session pub(crate) session_id: SessionId, /// Incoming commands from the manager - #[pin] pub(crate) commands_rx: ReceiverStream, /// Sink to send messages to the [`SessionManager`]. pub(crate) to_session: mpsc::Sender, /// Incoming request to send to delegate to the remote peer. - #[pin] pub(crate) request_tx: Fuse>, - /// All requests currently in progress. - pub(crate) inflight_requests: FnvHashMap, - /// Buffered messages that should be sent to the remote peer. - pub(crate) buffered_outgoing: VecDeque, + /// All requests sent to the remote peer we're waiting on a response + pub(crate) inflight_requests: FnvHashMap, + /// All requests that were sent by the remote peer. + pub(crate) received_requests: Vec, + /// Buffered messages that should be handled and sent to the peer. + pub(crate) queued_outgoing: VecDeque, +} + +impl ActiveSession { + /// Returns `true` if the session is currently in the process of disconnecting + fn is_disconnecting(&self) -> bool { + self.conn.inner().is_disconnecting() + } + + /// Returns the next request id + fn next_id(&mut self) -> u64 { + let id = self.next_id; + self.next_id += 1; + id + } + + /// Handle a message read from the connection. + /// + /// Returns an error if the message is considered to be in violation of the protocol + fn on_incoming(&mut self, msg: EthMessage) -> Option { + /// A macro that handles an incoming request + /// This creates a new channel and tries to send the sender half to the session while + /// storing to receiver half internally so the pending response can be polled. + macro_rules! on_request { + ($req:ident, $resp_item:ident, $req_item:ident) => { + let RequestPair { request_id, message: request } = $req; + let (tx, response) = oneshot::channel(); + let received = ReceivedRequest { + request_id, + rx: PeerResponse::$resp_item { response }, + received: Instant::now(), + }; + if self + .try_emit_message(PeerMessage::EthRequest(PeerRequest::$req_item { + request, + response: tx, + })) + .is_ok() + { + self.received_requests.push(received); + } + }; + } + + /// Processes a response received from the peer + macro_rules! on_response { + ($resp:ident, $item:ident) => { + let RequestPair { request_id, message } = $resp; + #[allow(clippy::collapsible_match)] + if let Some(resp) = self.inflight_requests.remove(&request_id) { + if let PeerRequest::$item { response, .. } = resp { + let _ = response.send(Ok(message)); + } else { + // TODO handle bad response + } + } else { + // TODO handle unexpected response + } + }; + } + + match msg { + EthMessage::Status(_) => { + return Some(EthStreamError::HandshakeError(HandshakeError::StatusNotInHandshake)) + } + EthMessage::NewBlockHashes(msg) => { + self.emit_message(PeerMessage::NewBlockHashes(Arc::new(msg))); + } + EthMessage::NewBlock(msg) => { + let block = + NewBlockMessage { hash: msg.block.header.hash_slow(), block: Arc::new(*msg) }; + self.emit_message(PeerMessage::NewBlock(block)); + } + EthMessage::Transactions(msg) => { + self.emit_message(PeerMessage::Transactions(Arc::new(msg))); + } + EthMessage::NewPooledTransactionHashes(msg) => { + self.emit_message(PeerMessage::PooledTransactions(Arc::new(msg))); + } + EthMessage::GetBlockHeaders(req) => { + on_request!(req, BlockHeaders, GetBlockHeaders); + } + EthMessage::BlockHeaders(resp) => { + on_response!(resp, GetBlockHeaders); + } + EthMessage::GetBlockBodies(req) => { + on_request!(req, BlockBodies, GetBlockBodies); + } + EthMessage::BlockBodies(resp) => { + on_response!(resp, GetBlockBodies); + } + EthMessage::GetPooledTransactions(req) => { + on_request!(req, PooledTransactions, GetPooledTransactions); + } + EthMessage::PooledTransactions(resp) => { + on_response!(resp, GetPooledTransactions); + } + EthMessage::GetNodeData(req) => { + on_request!(req, NodeData, GetNodeData); + } + EthMessage::NodeData(resp) => { + on_response!(resp, GetNodeData); + } + EthMessage::GetReceipts(req) => { + on_request!(req, Receipts, GetReceipts); + } + EthMessage::Receipts(resp) => { + on_response!(resp, GetReceipts); + } + }; + + None + } + + /// Handle an incoming peer request. + fn on_peer_request(&mut self, req: PeerRequest) { + let request_id = self.next_id(); + let msg = req.create_request_message(request_id); + self.queued_outgoing.push_back(msg.into()); + self.inflight_requests.insert(request_id, req); + } + + /// Handle a message received from the internal network + fn on_peer_message(&mut self, msg: PeerMessage) { + match msg { + PeerMessage::NewBlockHashes(msg) => { + self.queued_outgoing.push_back(EthBroadcastMessage::NewBlockHashes(msg).into()); + } + PeerMessage::NewBlock(msg) => { + self.queued_outgoing.push_back(EthBroadcastMessage::NewBlock(msg.block).into()); + } + PeerMessage::Transactions(msg) => { + self.queued_outgoing.push_back(EthBroadcastMessage::Transactions(msg).into()); + } + PeerMessage::PooledTransactions(msg) => { + self.queued_outgoing + .push_back(EthBroadcastMessage::NewPooledTransactionHashes(msg).into()); + } + PeerMessage::EthRequest(req) => { + self.on_peer_request(req); + } + PeerMessage::Other(_) => {} + } + } + + /// Handle a Response to the peer + fn handle_outgoing_response(&mut self, id: u64, resp: PeerResponseResult) { + match resp.try_into_message(id) { + Ok(msg) => { + self.queued_outgoing.push_back(msg.into()); + } + Err(err) => { + error!(?err, target = "net", "Failed to respond to received request"); + } + } + } + + /// Send a message back to the [`SessionsManager`] + fn emit_message(&self, message: PeerMessage) { + let _ = self.try_emit_message(message).map_err(|err| { + warn!( + %err, + target = "net", + "dropping incoming message", + ); + }); + } + + /// Send a message back to the [`SessionsManager`] + fn try_emit_message( + &self, + message: PeerMessage, + ) -> Result<(), mpsc::error::TrySendError> { + self.to_session + .try_send(ActiveSessionMessage::ValidMessage { peer_id: self.remote_peer_id, message }) + } + + /// Report back that this session has been closed. + fn disconnect(&self) { + // NOTE: we clone here so there's enough capacity to deliver this message + let _ = self.to_session.clone().try_send(ActiveSessionMessage::Disconnected { + peer_id: self.remote_peer_id, + remote_addr: self.remote_addr, + }); + } + + /// Report back that this session has been closed due to an error + fn close_on_error(&self, error: EthStreamError) { + // NOTE: we clone here so there's enough capacity to deliver this message + let _ = self.to_session.clone().try_send(ActiveSessionMessage::ClosedOnConnectionError { + peer_id: self.remote_peer_id, + remote_addr: self.remote_addr, + error, + }); + } } impl Future for ActiveSession { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + let this = self.get_mut(); + + if this.is_disconnecting() { + // try to close the flush out the remaining Disconnect message + let _ = ready!(this.conn.poll_close_unpin(cx)); + this.disconnect(); + return Poll::Ready(()) + } loop { let mut progress = false; - // we prioritize incoming messages + + // we prioritize incoming commands sent from the session manager loop { - match this.commands_rx.as_mut().poll_next(cx) { + match this.commands_rx.poll_next_unpin(cx) { Poll::Pending => break, Poll::Ready(None) => { // this is only possible when the manager was dropped, in which case we also // terminate this session return Poll::Ready(()) } - Poll::Ready(Some(_cmd)) => { + Poll::Ready(Some(cmd)) => { progress = true; - // TODO handle command - - continue + match cmd { + SessionCommand::Disconnect { reason } => { + let reason = + reason.unwrap_or(DisconnectReason::DisconnectRequested); + this.conn.inner_mut().start_disconnect(reason); + } + SessionCommand::Message(msg) => { + this.on_peer_message(msg); + } + } } } } - while let Poll::Ready(Some(_req)) = this.request_tx.as_mut().poll_next(cx) { + while let Poll::Ready(Some(req)) = this.request_tx.poll_next_unpin(cx) { progress = true; - // TODO handle request + this.on_peer_request(req); } - // send and flush - while this.conn.as_mut().poll_ready(cx).is_ready() { - if let Some(_msg) = this.buffered_outgoing.pop_front() { + // Advance all active requests. + // We remove each request one by one and add them back. + for idx in (0..this.received_requests.len()).rev() { + let mut req = this.received_requests.swap_remove(idx); + match req.rx.poll(cx) { + Poll::Pending => { + // not ready yet + this.received_requests.push(req); + } + Poll::Ready(Ok(resp)) => { + this.handle_outgoing_response(req.request_id, resp); + } + Poll::Ready(Err(_)) => { + // ignore on error + } + } + } + + // Send messages by advancing the sink and queuing in buffered messages + while this.conn.poll_ready_unpin(cx).is_ready() { + if let Some(msg) = this.queued_outgoing.pop_front() { progress = true; - // TODO encode message and start send + let res = match msg { + OutgoingMessage::Eth(msg) => this.conn.start_send_unpin(msg), + OutgoingMessage::Broadcast(msg) => this.conn.start_send_broadcast(msg), + }; + if let Err(_err) = res { + return Poll::Ready(()) + } } else { break } } loop { - match this.conn.as_mut().poll_next(cx) { + match this.conn.poll_next_unpin(cx) { Poll::Pending => break, - Poll::Ready(None) => { - // disconnected - } - Poll::Ready(Some(_msg)) => { + Poll::Ready(None) => return Poll::Pending, + Poll::Ready(Some(res)) => { progress = true; - // decode and handle message - - continue + match res { + Ok(msg) => { + // decode and handle message + if let Some(err) = this.on_incoming(msg) { + this.close_on_error(err); + return Poll::Ready(()) + } + } + Err(err) => { + this.close_on_error(err); + return Poll::Ready(()) + } + } } } } @@ -117,3 +371,33 @@ impl Future for ActiveSession { } } } + +/// Tracks a request received from the peer +pub(crate) struct ReceivedRequest { + /// Protocol Identifier + request_id: u64, + /// Receiver half of the channel that's supposed to receive the proper response. + rx: PeerResponse, + /// Timestamp when we read this msg from the wire. + received: Instant, +} + +/// Outgoing messages that can be sent over the wire. +pub(crate) enum OutgoingMessage { + /// A message that is owned. + Eth(EthMessage), + /// A message that may be shared by multiple sessions. + Broadcast(EthBroadcastMessage), +} + +impl From for OutgoingMessage { + fn from(value: EthMessage) -> Self { + OutgoingMessage::Eth(value) + } +} + +impl From for OutgoingMessage { + fn from(value: EthBroadcastMessage) -> Self { + OutgoingMessage::Broadcast(value) + } +} diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs index 177865dee..f83eafbdb 100644 --- a/crates/net/network/src/session/handle.rs +++ b/crates/net/network/src/session/handle.rs @@ -68,7 +68,7 @@ pub(crate) enum PendingSessionEvent { session_id: SessionId, remote_addr: SocketAddr, /// The remote node's public key - node_id: PeerId, + peer_id: PeerId, capabilities: Arc, status: Status, conn: EthStream>>, @@ -84,7 +84,7 @@ pub(crate) enum PendingSessionEvent { OutgoingConnectionError { remote_addr: SocketAddr, session_id: SessionId, - node_id: PeerId, + peer_id: PeerId, error: io::Error, }, /// Thrown when authentication via Ecies failed. @@ -107,19 +107,26 @@ pub(crate) enum SessionCommand { /// [`SessionManager`](crate::session::SessionManager) #[derive(Debug)] pub(crate) enum ActiveSessionMessage { - /// Session disconnected. - Closed { node_id: PeerId, remote_addr: SocketAddr }, + /// Session was gracefully disconnected. + Disconnected { peer_id: PeerId, remote_addr: SocketAddr }, + /// Session was closed due an error + ClosedOnConnectionError { + peer_id: PeerId, + remote_addr: SocketAddr, + /// The error that caused the session to close + error: EthStreamError, + }, /// A session received a valid message via RLPx. ValidMessage { /// Identifier of the remote peer. - node_id: PeerId, + peer_id: PeerId, /// Message received from the peer. message: PeerMessage, }, /// Received a message that does not match the announced capabilities of the peer. InvalidMessage { /// Identifier of the remote peer. - node_id: PeerId, + peer_id: PeerId, /// Announced capabilities of the remote peer. capabilities: Arc, /// Message received from the peer. diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 69e6b285b..1deb14f02 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -52,7 +52,7 @@ pub(crate) struct SessionManager { /// The secret key used for authenticating sessions. secret_key: SecretKey, /// The node id of node - node_id: PeerId, + peer_id: PeerId, /// The `Status` message to send to peers. status: Status, /// THe `Hello` message to send to peers. @@ -97,18 +97,18 @@ impl SessionManager { let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer); let pk = secret_key.public_key(SECP256K1); - let node_id = PeerId::from_slice(&pk.serialize_uncompressed()[1..]); + let peer_id = PeerId::from_slice(&pk.serialize_uncompressed()[1..]); // TODO: make sure this is the right place to put these builders - maybe per-Network rather // than per-Session? - let hello = HelloBuilder::new(node_id).build(); + let hello = HelloBuilder::new(peer_id).build(); let status = StatusBuilder::default().build(); let fork_filter = Hardfork::Frontier.fork_filter(); Self { next_id: 0, secret_key, - node_id, + peer_id, status, hello, fork_filter, @@ -169,7 +169,7 @@ impl SessionManager { } /// Starts a new pending session from the local node to the given remote node. - pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_node_id: PeerId) { + pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) { let session_id = self.next_id(); let (disconnect_tx, disconnect_rx) = oneshot::channel(); let pending_events = self.pending_sessions_tx.clone(); @@ -178,7 +178,7 @@ impl SessionManager { pending_events, session_id, remote_addr, - remote_node_id, + remote_peer_id, self.secret_key, self.hello.clone(), self.status, @@ -218,17 +218,33 @@ impl SessionManager { } Poll::Ready(Some(event)) => { return match event { - ActiveSessionMessage::Closed { node_id, remote_addr } => { - trace!(?node_id, target = "net::session", "closed active session."); - let _ = self.active_sessions.remove(&node_id); - Poll::Ready(SessionEvent::Disconnected { node_id, remote_addr }) + ActiveSessionMessage::Disconnected { peer_id, remote_addr } => { + trace!( + ?peer_id, + target = "net::session", + "gracefully disconnected active session." + ); + let _ = self.active_sessions.remove(&peer_id); + Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr }) } - ActiveSessionMessage::ValidMessage { node_id, message } => { - // TODO: since all messages are known they should be decoded in the session - Poll::Ready(SessionEvent::ValidMessage { node_id, message }) + ActiveSessionMessage::ClosedOnConnectionError { + peer_id, + remote_addr, + error, + } => { + trace!(?peer_id, ?error, target = "net::session", "closed session."); + let _ = self.active_sessions.remove(&peer_id); + Poll::Ready(SessionEvent::SessionClosedOnConnectionError { + remote_addr, + peer_id, + error, + }) } - ActiveSessionMessage::InvalidMessage { node_id, capabilities, message } => { - Poll::Ready(SessionEvent::InvalidMessage { node_id, message, capabilities }) + ActiveSessionMessage::ValidMessage { peer_id, message } => { + Poll::Ready(SessionEvent::ValidMessage { peer_id, message }) + } + ActiveSessionMessage::InvalidMessage { peer_id, capabilities, message } => { + Poll::Ready(SessionEvent::InvalidMessage { peer_id, message, capabilities }) } } } @@ -253,7 +269,7 @@ impl SessionManager { PendingSessionEvent::Established { session_id, remote_addr, - node_id, + peer_id, capabilities, conn, status, @@ -266,11 +282,12 @@ impl SessionManager { let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer); - let messages = PeerRequestSender { peer: node_id, to_session_tx }; + let messages = PeerRequestSender { peer_id, to_session_tx }; let session = ActiveSession { next_id: 0, - remote_node_id: node_id, + remote_peer_id: peer_id, + remote_addr, remote_capabilities: Arc::clone(&capabilities), session_id, commands_rx: ReceiverStream::new(commands_rx), @@ -278,23 +295,24 @@ impl SessionManager { request_tx: ReceiverStream::new(messages_rx).fuse(), inflight_requests: Default::default(), conn, - buffered_outgoing: Default::default(), + queued_outgoing: Default::default(), + received_requests: Default::default(), }; self.spawn(session); let handle = ActiveSessionHandle { session_id, - remote_id: node_id, + remote_id: peer_id, established: Instant::now(), capabilities: Arc::clone(&capabilities), commands_to_session, }; - self.active_sessions.insert(node_id, handle); + self.active_sessions.insert(peer_id, handle); return Poll::Ready(SessionEvent::SessionEstablished { - node_id, + peer_id, remote_addr, capabilities, status, @@ -316,10 +334,10 @@ impl SessionManager { error, }) } - Direction::Outgoing(node_id) => { + Direction::Outgoing(peer_id) => { Poll::Ready(SessionEvent::OutgoingPendingSessionClosed { remote_addr, - node_id, + peer_id, error, }) } @@ -328,14 +346,14 @@ impl SessionManager { PendingSessionEvent::OutgoingConnectionError { remote_addr, session_id, - node_id, + peer_id, error, } => { trace!( ?error, ?session_id, ?remote_addr, - ?node_id, + ?peer_id, target = "net::session", "connection refused" ); @@ -408,7 +426,7 @@ pub(crate) enum SessionEvent { /// /// This session is now able to exchange data. SessionEstablished { - node_id: PeerId, + peer_id: PeerId, remote_addr: SocketAddr, capabilities: Arc, status: Status, @@ -416,13 +434,13 @@ pub(crate) enum SessionEvent { }, /// A session received a valid message via RLPx. ValidMessage { - node_id: PeerId, + peer_id: PeerId, /// Message received from the peer. message: PeerMessage, }, /// Received a message that does not match the announced capabilities of the peer. InvalidMessage { - node_id: PeerId, + peer_id: PeerId, /// Announced capabilities of the remote peer. capabilities: Arc, /// Message received from the peer. @@ -433,13 +451,22 @@ pub(crate) enum SessionEvent { /// Closed an outgoing pending session during authentication. OutgoingPendingSessionClosed { remote_addr: SocketAddr, - node_id: PeerId, + peer_id: PeerId, error: Option, }, /// Failed to establish a tcp stream - OutgoingConnectionError { remote_addr: SocketAddr, node_id: PeerId, error: io::Error }, - /// Active session was disconnected. - Disconnected { node_id: PeerId, remote_addr: SocketAddr }, + OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error }, + /// Session was closed due to an error + SessionClosedOnConnectionError { + /// The id of the remote peer. + peer_id: PeerId, + /// The socket we were connected to. + remote_addr: SocketAddr, + /// The error that caused the session to close + error: EthStreamError, + }, + /// Active session was gracefully disconnected. + Disconnected { peer_id: PeerId, remote_addr: SocketAddr }, } /// The error thrown when the max configured limit has been reached and no more connections are @@ -478,13 +505,13 @@ async fn start_pending_incoming_session( } /// Starts the authentication process for a connection initiated by a remote peer. -#[instrument(skip_all, fields(%remote_addr, node_id), target = "net")] +#[instrument(skip_all, fields(%remote_addr, peer_id), target = "net")] async fn start_pending_outbound_session( disconnect_rx: oneshot::Receiver<()>, events: mpsc::Sender, session_id: SessionId, remote_addr: SocketAddr, - remote_node_id: PeerId, + remote_peer_id: PeerId, secret_key: SecretKey, hello: HelloMessage, status: Status, @@ -497,7 +524,7 @@ async fn start_pending_outbound_session( .send(PendingSessionEvent::OutgoingConnectionError { remote_addr, session_id, - node_id: remote_node_id, + peer_id: remote_peer_id, error, }) .await; @@ -511,7 +538,7 @@ async fn start_pending_outbound_session( session_id, remote_addr, secret_key, - Direction::Outgoing(remote_node_id), + Direction::Outgoing(remote_peer_id), hello, status, fork_filter, @@ -550,8 +577,8 @@ async fn authenticate( return } }, - Direction::Outgoing(remote_node_id) => { - match ECIESStream::connect(stream, secret_key, remote_node_id).await { + Direction::Outgoing(remote_peer_id) => { + match ECIESStream::connect(stream, secret_key, remote_peer_id).await { Ok(stream) => stream, Err(error) => { let _ = events @@ -638,7 +665,7 @@ async fn authenticate_stream( PendingSessionEvent::Established { session_id, remote_addr, - node_id: their_hello.id, + peer_id: their_hello.id, capabilities: Arc::new(Capabilities::from(their_hello.capabilities)), status: their_status, conn: eth_stream, diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index baf950341..8b3f7a6dd 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -223,8 +223,7 @@ where fn on_peer_action(&mut self, action: PeerAction) { match action { PeerAction::Connect { peer_id, remote_addr } => { - self.queued_messages - .push_back(StateAction::Connect { node_id: peer_id, remote_addr }); + self.queued_messages.push_back(StateAction::Connect { peer_id, remote_addr }); } PeerAction::Disconnect { peer_id, reason } => { self.state_fetcher.on_pending_disconnect(&peer_id); @@ -388,7 +387,7 @@ pub enum StateAction { hashes: Arc, }, /// Create a new connection to the given node. - Connect { remote_addr: SocketAddr, node_id: PeerId }, + Connect { remote_addr: SocketAddr, peer_id: PeerId }, /// Disconnect an existing connection Disconnect { peer_id: PeerId, diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 81f26036a..e3444b672 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -72,19 +72,19 @@ where fn on_session_event(&mut self, event: SessionEvent) -> Option { match event { SessionEvent::SessionEstablished { - node_id, + peer_id, remote_addr, capabilities, status, messages, } => match self.state.on_session_activated( - node_id, + peer_id, capabilities.clone(), status, messages.clone(), ) { Ok(_) => Some(SwarmEvent::SessionEstablished { - node_id, + peer_id, remote_addr, capabilities, messages, @@ -98,24 +98,31 @@ where None } }, - SessionEvent::ValidMessage { node_id, message } => { - Some(SwarmEvent::ValidMessage { node_id, message }) + SessionEvent::ValidMessage { peer_id, message } => { + Some(SwarmEvent::ValidMessage { peer_id, message }) } - SessionEvent::InvalidMessage { node_id, capabilities, message } => { - Some(SwarmEvent::InvalidCapabilityMessage { node_id, capabilities, message }) + SessionEvent::InvalidMessage { peer_id, capabilities, message } => { + Some(SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message }) } SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => { Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error }) } - SessionEvent::OutgoingPendingSessionClosed { remote_addr, node_id, error } => { - Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, node_id, error }) + SessionEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => { + Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error }) } - SessionEvent::Disconnected { node_id, remote_addr } => { - self.state.on_session_closed(node_id); - Some(SwarmEvent::SessionClosed { node_id, remote_addr }) + SessionEvent::Disconnected { peer_id, remote_addr } => { + self.state.on_session_closed(peer_id); + Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: None }) } - SessionEvent::OutgoingConnectionError { remote_addr, node_id, error } => { - Some(SwarmEvent::OutgoingConnectionError { node_id, remote_addr, error }) + SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => { + self.state.on_session_closed(peer_id); + + // TODO(mattsse): reputation change on error + + Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) }) + } + SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => { + Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error }) } } } @@ -146,11 +153,11 @@ where /// Hook for actions pulled from the state fn on_state_action(&mut self, event: StateAction) -> Option { match event { - StateAction::Connect { remote_addr, node_id } => { - self.sessions.dial_outbound(remote_addr, node_id); + StateAction::Connect { remote_addr, peer_id } => { + self.sessions.dial_outbound(remote_addr, peer_id); } - StateAction::Disconnect { peer_id: node_id, reason } => { - self.sessions.disconnect(node_id, reason); + StateAction::Disconnect { peer_id, reason } => { + self.sessions.disconnect(peer_id, reason); } StateAction::NewBlock { peer_id, block: msg } => { let msg = PeerMessage::NewBlock(msg); @@ -220,13 +227,13 @@ pub enum SwarmEvent { /// Events related to the actual network protocol. ValidMessage { /// The peer that sent the message - node_id: PeerId, + peer_id: PeerId, /// Message received from the peer message: PeerMessage, }, /// Received a message that does not match the announced capabilities of the peer. InvalidCapabilityMessage { - node_id: PeerId, + peer_id: PeerId, /// Announced capabilities of the remote peer. capabilities: Arc, /// Message received from the peer. @@ -255,30 +262,25 @@ pub enum SwarmEvent { remote_addr: SocketAddr, }, SessionEstablished { - node_id: PeerId, + peer_id: PeerId, remote_addr: SocketAddr, capabilities: Arc, messages: PeerRequestSender, }, SessionClosed { - node_id: PeerId, - remote_addr: SocketAddr, - }, - /// Closed an incoming pending session during authentication. - IncomingPendingSessionClosed { + peer_id: PeerId, remote_addr: SocketAddr, + /// Whether the session was closed due to an error error: Option, }, + /// Closed an incoming pending session during authentication. + IncomingPendingSessionClosed { remote_addr: SocketAddr, error: Option }, /// Closed an outgoing pending session during authentication. OutgoingPendingSessionClosed { remote_addr: SocketAddr, - node_id: PeerId, + peer_id: PeerId, error: Option, }, /// Failed to establish a tcp stream to the given address/node - OutgoingConnectionError { - remote_addr: SocketAddr, - node_id: PeerId, - error: io::Error, - }, + OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error }, }