feat(net): add active session type (#169)

* refactor(net): unify capability types

* refactor(net): unify capability types

* feat(net): add active session type
This commit is contained in:
Matthias Seitz
2022-11-07 23:15:38 +01:00
committed by GitHub
parent 86ffb4756d
commit 1408309b74
6 changed files with 202 additions and 42 deletions

View File

@ -5,6 +5,8 @@
use reth_eth_wire::{BlockHeaders, GetBlockHeaders};
use crate::NodeId;
use reth_eth_wire::capability::CapabilityMessage;
use tokio::sync::{mpsc, oneshot};
/// Result alias for result of a request.
@ -53,3 +55,12 @@ pub enum CapabilityRequest {
pub enum CapabilityResponse {
GetBlockHeaders(RequestResult<BlockHeaders>),
}
/// A Cloneable connection for sending messages directly to the session of a peer.
#[derive(Debug, Clone)]
pub struct PeerMessageSender {
/// id of the remote node.
pub(crate) peer: NodeId,
/// The Sender half connected to a session.
pub(crate) to_session_tx: mpsc::Sender<CapabilityMessage>,
}

View File

@ -0,0 +1,114 @@
//! Represents an established session.
use crate::{
session::{
handle::{ActiveSessionMessage, SessionCommand},
SessionId,
},
NodeId,
};
use fnv::FnvHashMap;
use futures::{stream::Fuse, Sink, Stream};
use pin_project::pin_project;
use reth_ecies::stream::ECIESStream;
use reth_eth_wire::capability::{Capabilities, CapabilityMessage};
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{net::TcpStream, sync::mpsc};
use tokio_stream::wrappers::ReceiverStream;
/// The type that advances an established session by listening for incoming messages (from local
/// node or read from connection) and emitting events back to the [`SessionHandler`].
#[pin_project]
pub(crate) struct ActiveSession {
/// The underlying connection.
#[pin]
pub(crate) conn: ECIESStream<TcpStream>,
/// Identifier of the node we're connected to.
pub(crate) remote_node_id: NodeId,
/// All capabilities the peer announced
pub(crate) remote_capabilities: Arc<Capabilities>,
/// Internal identifier of this session
pub(crate) session_id: SessionId,
/// Incoming commands from the manager
#[pin]
pub(crate) commands_rx: ReceiverStream<SessionCommand>,
/// Sink to send messages to the [`SessionManager`].
pub(crate) to_session: mpsc::Sender<ActiveSessionMessage>,
/// Incoming request to send to delegate to the remote peer.
#[pin]
pub(crate) messages_rx: Fuse<ReceiverStream<CapabilityMessage>>,
/// All requests currently in progress.
pub(crate) inflight_requests: FnvHashMap<u64, ()>,
/// Buffered messages that should be sent to the remote peer.
pub(crate) buffered_outgoing: VecDeque<CapabilityMessage>,
}
impl Future for ActiveSession {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let mut progress = false;
// we prioritize incoming messages
loop {
match this.commands_rx.as_mut().poll_next(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
// this is only possible when the manager was dropped, in which case we also
// terminate this session
return Poll::Ready(())
}
Poll::Ready(Some(_cmd)) => {
progress = true;
// TODO handle command
continue
}
}
}
while let Poll::Ready(Some(_msg)) = this.messages_rx.as_mut().poll_next(cx) {
progress = true;
// TODO handle request
}
// send and flush
while this.conn.as_mut().poll_ready(cx).is_ready() {
if let Some(_msg) = this.buffered_outgoing.pop_front() {
progress = true;
// TODO encode message and start send
} else {
break
}
}
loop {
match this.conn.as_mut().poll_next(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
// disconnected
}
Poll::Ready(Some(_msg)) => {
progress = true;
// decode and handle message
continue
}
}
}
if !progress {
return Poll::Pending
}
}
}
}

View File

@ -4,7 +4,10 @@ use crate::{
NodeId,
};
use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::capability::{Capabilities, CapabilityMessage};
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
Status,
};
use std::{io, net::SocketAddr, sync::Arc, time::Instant};
use tokio::{
net::TcpStream,
@ -36,7 +39,7 @@ pub(crate) struct ActiveSessionHandle {
/// Announced capabilities of the peer.
pub(crate) capabilities: Arc<Capabilities>,
/// Sender half of the command channel used send commands _to_ the spawned session
pub(crate) commands: mpsc::Sender<SessionCommand>,
pub(crate) commands_to_session: mpsc::Sender<SessionCommand>,
}
// === impl ActiveSessionHandle ===
@ -45,7 +48,7 @@ impl ActiveSessionHandle {
/// Sends a disconnect command to the session.
pub(crate) fn disconnect(&self) {
// Note: we clone the sender which ensures the channel has capacity to send the message
let _ = self.commands.clone().try_send(SessionCommand::Disconnect);
let _ = self.commands_to_session.clone().try_send(SessionCommand::Disconnect);
}
}
@ -58,12 +61,14 @@ impl ActiveSessionHandle {
pub(crate) enum PendingSessionEvent {
/// Initial handshake step was successful <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#initial-handshake>
SuccessfulHandshake { remote_addr: SocketAddr, session_id: SessionId },
/// Represents a successful `Hello` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
Hello {
/// Represents a successful `Hello` and `Status` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
Established {
session_id: SessionId,
remote_addr: SocketAddr,
node_id: NodeId,
capabilities: Arc<Capabilities>,
stream: ECIESStream<TcpStream>,
status: Status,
conn: ECIESStream<TcpStream>,
},
/// Handshake unsuccessful, session was disconnected.
Disconnected {
@ -114,12 +119,3 @@ pub(crate) enum ActiveSessionMessage {
message: CapabilityMessage,
},
}
/// A Cloneable connection for sending messages directly to the session of a peer.
#[derive(Debug, Clone)]
pub struct PeerMessageSender {
/// id of the remote node.
pub(crate) peer: NodeId,
/// The Sender half connected to a session.
pub(crate) to_session_tx: mpsc::Sender<CapabilityMessage>,
}

View File

@ -1,17 +1,20 @@
//! Support for handling peer sessions.
pub use crate::message::PeerMessageSender;
use crate::{
session::handle::{
session::{
active::ActiveSession,
handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
},
},
NodeId,
};
use fnv::FnvHashMap;
use futures::{future::Either, io, FutureExt, StreamExt};
pub use handle::PeerMessageSender;
use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
UnauthedEthStream,
Status, UnauthedEthStream,
};
use secp256k1::{SecretKey, SECP256K1};
use std::{
@ -20,6 +23,7 @@ use std::{
net::SocketAddr,
sync::Arc,
task::{Context, Poll},
time::Instant,
};
use tokio::{
net::TcpStream,
@ -29,6 +33,7 @@ use tokio::{
use tokio_stream::wrappers::ReceiverStream;
use tracing::{instrument, trace, warn};
mod active;
mod handle;
/// Internal identifier for active sessions.
@ -213,29 +218,55 @@ impl SessionManager {
"successful handshake"
);
}
PendingSessionEvent::Hello {
PendingSessionEvent::Established {
session_id,
node_id: _,
capabilities: _,
stream: _,
remote_addr,
node_id,
capabilities,
conn,
status,
} => {
// move from pending to established.
let _ = self.pending_sessions.remove(&session_id);
// TODO spawn the authenticated session
// let session = ActiveSessionHandle {
// session_id,
// remote_id: node_id,
// established: Instant::now(),
// capabilities,
// commands
// };
// self.active_sessions.insert(node_id, session);
// return Poll::Ready(SessionEvent::SessionAuthenticated {
// node_id,
// capabilities,
// messages: ()
// })
let (commands_to_session, commands_rx) =
mpsc::channel(self.session_command_buffer);
let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
let messages = PeerMessageSender { peer: node_id, to_session_tx };
let session = ActiveSession {
remote_node_id: node_id,
remote_capabilities: Arc::clone(&capabilities),
session_id,
commands_rx: ReceiverStream::new(commands_rx),
to_session: self.active_session_tx.clone(),
messages_rx: ReceiverStream::new(messages_rx).fuse(),
inflight_requests: Default::default(),
conn,
buffered_outgoing: Default::default(),
};
self.spawn(session);
let handle = ActiveSessionHandle {
session_id,
remote_id: node_id,
established: Instant::now(),
capabilities: Arc::clone(&capabilities),
commands_to_session,
};
self.active_sessions.insert(node_id, handle);
return Poll::Ready(SessionEvent::SessionEstablished {
node_id,
remote_addr,
capabilities,
status,
messages,
})
}
PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
trace!(
@ -343,10 +374,11 @@ pub(crate) enum SessionEvent {
/// A new session was successfully authenticated.
///
/// This session is now able to exchange data.
SessionAuthenticated {
SessionEstablished {
node_id: NodeId,
remote_addr: SocketAddr,
capabilities: Arc<Capabilities>,
status: Status,
messages: PeerMessageSender,
},
/// A session received a valid message via RLPx.

View File

@ -5,12 +5,12 @@ use crate::{
fetch::StateFetcher,
message::CapabilityResponse,
peers::{PeerAction, PeersManager},
session::PeerMessageSender,
NodeId,
};
use futures::FutureExt;
use reth_eth_wire::capability::Capabilities;
use crate::message::PeerMessageSender;
use reth_eth_wire::{capability::Capabilities, Status};
use reth_interfaces::provider::BlockProvider;
use reth_primitives::{H256, U256};
use std::{
@ -75,6 +75,7 @@ where
&mut self,
_node_id: NodeId,
_capabilities: Arc<Capabilities>,
_status: Status,
_messages: PeerMessageSender,
) {
// TODO notify fetecher as well

View File

@ -62,8 +62,14 @@ where
/// Handles a polled [`SessionEvent`]
fn on_session_event(&mut self, event: SessionEvent) -> Option<SwarmEvent> {
match event {
SessionEvent::SessionAuthenticated { node_id, remote_addr, capabilities, messages } => {
self.state.on_session_authenticated(node_id, capabilities, messages);
SessionEvent::SessionEstablished {
node_id,
remote_addr,
capabilities,
status,
messages,
} => {
self.state.on_session_authenticated(node_id, capabilities, status, messages);
Some(SwarmEvent::SessionEstablished { node_id, remote_addr })
}
SessionEvent::ValidMessage { node_id, message } => {