diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 775391232..662c0e445 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -5,6 +5,8 @@ use reth_eth_wire::{BlockHeaders, GetBlockHeaders}; +use crate::NodeId; +use reth_eth_wire::capability::CapabilityMessage; use tokio::sync::{mpsc, oneshot}; /// Result alias for result of a request. @@ -53,3 +55,12 @@ pub enum CapabilityRequest { pub enum CapabilityResponse { GetBlockHeaders(RequestResult), } + +/// A Cloneable connection for sending messages directly to the session of a peer. +#[derive(Debug, Clone)] +pub struct PeerMessageSender { + /// id of the remote node. + pub(crate) peer: NodeId, + /// The Sender half connected to a session. + pub(crate) to_session_tx: mpsc::Sender, +} diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs new file mode 100644 index 000000000..f0ee4da31 --- /dev/null +++ b/crates/net/network/src/session/active.rs @@ -0,0 +1,114 @@ +//! Represents an established session. + +use crate::{ + session::{ + handle::{ActiveSessionMessage, SessionCommand}, + SessionId, + }, + NodeId, +}; + +use fnv::FnvHashMap; +use futures::{stream::Fuse, Sink, Stream}; +use pin_project::pin_project; +use reth_ecies::stream::ECIESStream; +use reth_eth_wire::capability::{Capabilities, CapabilityMessage}; +use std::{ + collections::VecDeque, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::{net::TcpStream, sync::mpsc}; +use tokio_stream::wrappers::ReceiverStream; + +/// The type that advances an established session by listening for incoming messages (from local +/// node or read from connection) and emitting events back to the [`SessionHandler`]. +#[pin_project] +pub(crate) struct ActiveSession { + /// The underlying connection. + #[pin] + pub(crate) conn: ECIESStream, + /// Identifier of the node we're connected to. + pub(crate) remote_node_id: NodeId, + /// All capabilities the peer announced + pub(crate) remote_capabilities: Arc, + /// Internal identifier of this session + pub(crate) session_id: SessionId, + /// Incoming commands from the manager + #[pin] + pub(crate) commands_rx: ReceiverStream, + /// Sink to send messages to the [`SessionManager`]. + pub(crate) to_session: mpsc::Sender, + /// Incoming request to send to delegate to the remote peer. + #[pin] + pub(crate) messages_rx: Fuse>, + /// All requests currently in progress. + pub(crate) inflight_requests: FnvHashMap, + /// Buffered messages that should be sent to the remote peer. + pub(crate) buffered_outgoing: VecDeque, +} + +impl Future for ActiveSession { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + let mut progress = false; + // we prioritize incoming messages + loop { + match this.commands_rx.as_mut().poll_next(cx) { + Poll::Pending => break, + Poll::Ready(None) => { + // this is only possible when the manager was dropped, in which case we also + // terminate this session + return Poll::Ready(()) + } + Poll::Ready(Some(_cmd)) => { + progress = true; + // TODO handle command + + continue + } + } + } + + while let Poll::Ready(Some(_msg)) = this.messages_rx.as_mut().poll_next(cx) { + progress = true; + // TODO handle request + } + + // send and flush + while this.conn.as_mut().poll_ready(cx).is_ready() { + if let Some(_msg) = this.buffered_outgoing.pop_front() { + progress = true; + // TODO encode message and start send + } else { + break + } + } + + loop { + match this.conn.as_mut().poll_next(cx) { + Poll::Pending => break, + Poll::Ready(None) => { + // disconnected + } + Poll::Ready(Some(_msg)) => { + progress = true; + // decode and handle message + + continue + } + } + } + + if !progress { + return Poll::Pending + } + } + } +} diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs index 01b227a83..fce7b5d03 100644 --- a/crates/net/network/src/session/handle.rs +++ b/crates/net/network/src/session/handle.rs @@ -4,7 +4,10 @@ use crate::{ NodeId, }; use reth_ecies::{stream::ECIESStream, ECIESError}; -use reth_eth_wire::capability::{Capabilities, CapabilityMessage}; +use reth_eth_wire::{ + capability::{Capabilities, CapabilityMessage}, + Status, +}; use std::{io, net::SocketAddr, sync::Arc, time::Instant}; use tokio::{ net::TcpStream, @@ -36,7 +39,7 @@ pub(crate) struct ActiveSessionHandle { /// Announced capabilities of the peer. pub(crate) capabilities: Arc, /// Sender half of the command channel used send commands _to_ the spawned session - pub(crate) commands: mpsc::Sender, + pub(crate) commands_to_session: mpsc::Sender, } // === impl ActiveSessionHandle === @@ -45,7 +48,7 @@ impl ActiveSessionHandle { /// Sends a disconnect command to the session. pub(crate) fn disconnect(&self) { // Note: we clone the sender which ensures the channel has capacity to send the message - let _ = self.commands.clone().try_send(SessionCommand::Disconnect); + let _ = self.commands_to_session.clone().try_send(SessionCommand::Disconnect); } } @@ -58,12 +61,14 @@ impl ActiveSessionHandle { pub(crate) enum PendingSessionEvent { /// Initial handshake step was successful SuccessfulHandshake { remote_addr: SocketAddr, session_id: SessionId }, - /// Represents a successful `Hello` exchange: - Hello { + /// Represents a successful `Hello` and `Status` exchange: + Established { session_id: SessionId, + remote_addr: SocketAddr, node_id: NodeId, capabilities: Arc, - stream: ECIESStream, + status: Status, + conn: ECIESStream, }, /// Handshake unsuccessful, session was disconnected. Disconnected { @@ -114,12 +119,3 @@ pub(crate) enum ActiveSessionMessage { message: CapabilityMessage, }, } - -/// A Cloneable connection for sending messages directly to the session of a peer. -#[derive(Debug, Clone)] -pub struct PeerMessageSender { - /// id of the remote node. - pub(crate) peer: NodeId, - /// The Sender half connected to a session. - pub(crate) to_session_tx: mpsc::Sender, -} diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 9a4bb9960..b60f2527e 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -1,17 +1,20 @@ //! Support for handling peer sessions. +pub use crate::message::PeerMessageSender; use crate::{ - session::handle::{ - ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, + session::{ + active::ActiveSession, + handle::{ + ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, + }, }, NodeId, }; use fnv::FnvHashMap; use futures::{future::Either, io, FutureExt, StreamExt}; -pub use handle::PeerMessageSender; use reth_ecies::{stream::ECIESStream, ECIESError}; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, - UnauthedEthStream, + Status, UnauthedEthStream, }; use secp256k1::{SecretKey, SECP256K1}; use std::{ @@ -20,6 +23,7 @@ use std::{ net::SocketAddr, sync::Arc, task::{Context, Poll}, + time::Instant, }; use tokio::{ net::TcpStream, @@ -29,6 +33,7 @@ use tokio::{ use tokio_stream::wrappers::ReceiverStream; use tracing::{instrument, trace, warn}; +mod active; mod handle; /// Internal identifier for active sessions. @@ -213,29 +218,55 @@ impl SessionManager { "successful handshake" ); } - PendingSessionEvent::Hello { + PendingSessionEvent::Established { session_id, - node_id: _, - capabilities: _, - stream: _, + remote_addr, + node_id, + capabilities, + conn, + status, } => { // move from pending to established. let _ = self.pending_sessions.remove(&session_id); - // TODO spawn the authenticated session - // let session = ActiveSessionHandle { - // session_id, - // remote_id: node_id, - // established: Instant::now(), - // capabilities, - // commands - // }; - // self.active_sessions.insert(node_id, session); - // return Poll::Ready(SessionEvent::SessionAuthenticated { - // node_id, - // capabilities, - // messages: () - // }) + let (commands_to_session, commands_rx) = + mpsc::channel(self.session_command_buffer); + + let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer); + + let messages = PeerMessageSender { peer: node_id, to_session_tx }; + + let session = ActiveSession { + remote_node_id: node_id, + remote_capabilities: Arc::clone(&capabilities), + session_id, + commands_rx: ReceiverStream::new(commands_rx), + to_session: self.active_session_tx.clone(), + messages_rx: ReceiverStream::new(messages_rx).fuse(), + inflight_requests: Default::default(), + conn, + buffered_outgoing: Default::default(), + }; + + self.spawn(session); + + let handle = ActiveSessionHandle { + session_id, + remote_id: node_id, + established: Instant::now(), + capabilities: Arc::clone(&capabilities), + commands_to_session, + }; + + self.active_sessions.insert(node_id, handle); + + return Poll::Ready(SessionEvent::SessionEstablished { + node_id, + remote_addr, + capabilities, + status, + messages, + }) } PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => { trace!( @@ -343,10 +374,11 @@ pub(crate) enum SessionEvent { /// A new session was successfully authenticated. /// /// This session is now able to exchange data. - SessionAuthenticated { + SessionEstablished { node_id: NodeId, remote_addr: SocketAddr, capabilities: Arc, + status: Status, messages: PeerMessageSender, }, /// A session received a valid message via RLPx. diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 80d581eba..7c8c8843a 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -5,12 +5,12 @@ use crate::{ fetch::StateFetcher, message::CapabilityResponse, peers::{PeerAction, PeersManager}, - session::PeerMessageSender, NodeId, }; use futures::FutureExt; -use reth_eth_wire::capability::Capabilities; +use crate::message::PeerMessageSender; +use reth_eth_wire::{capability::Capabilities, Status}; use reth_interfaces::provider::BlockProvider; use reth_primitives::{H256, U256}; use std::{ @@ -75,6 +75,7 @@ where &mut self, _node_id: NodeId, _capabilities: Arc, + _status: Status, _messages: PeerMessageSender, ) { // TODO notify fetecher as well diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index aa7ab075c..96dd6cfc8 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -62,8 +62,14 @@ where /// Handles a polled [`SessionEvent`] fn on_session_event(&mut self, event: SessionEvent) -> Option { match event { - SessionEvent::SessionAuthenticated { node_id, remote_addr, capabilities, messages } => { - self.state.on_session_authenticated(node_id, capabilities, messages); + SessionEvent::SessionEstablished { + node_id, + remote_addr, + capabilities, + status, + messages, + } => { + self.state.on_session_authenticated(node_id, capabilities, status, messages); Some(SwarmEvent::SessionEstablished { node_id, remote_addr }) } SessionEvent::ValidMessage { node_id, message } => {