From 836ad6aaee45c31990dccb19662c40b409493151 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 8 Nov 2022 17:52:17 +0100 Subject: [PATCH] feat(net): add request/response message types (#176) * feat(net): add request/response message types * chore: rustfmt --- crates/net/eth-wire/src/types/message.rs | 4 +- crates/net/network/src/message.rs | 124 +++++++++++++++++++++-- crates/net/network/src/session/active.rs | 10 +- crates/net/network/src/session/mod.rs | 9 +- crates/net/network/src/state.rs | 22 ++-- 5 files changed, 139 insertions(+), 30 deletions(-) diff --git a/crates/net/eth-wire/src/types/message.rs b/crates/net/eth-wire/src/types/message.rs index 3fbcc0863..c5e3ffc8c 100644 --- a/crates/net/eth-wire/src/types/message.rs +++ b/crates/net/eth-wire/src/types/message.rs @@ -117,9 +117,9 @@ impl From for ProtocolMessage { /// correlate request-response message pairs. This allows for request multiplexing. #[derive(Clone, Debug, PartialEq, Eq)] pub enum EthMessage { - // Status is required for the protocol handshake + /// Status is required for the protocol handshake Status(Status), - // The following messages are broadcast to the network + /// The following messages are broadcast to the network NewBlockHashes(NewBlockHashes), NewBlock(Box), Transactions(Transactions), diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 662c0e445..2ba0ce938 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -3,7 +3,12 @@ //! An RLPx stream is multiplexed via the prepended message-id of a framed message. //! Capabilities are exchanged via the RLPx `Hello` message as pairs of `(id, version)`, -use reth_eth_wire::{BlockHeaders, GetBlockHeaders}; +use futures::FutureExt; +use reth_eth_wire::{ + BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, + GetReceipts, NewBlock, NewBlockHashes, NodeData, PooledTransactions, Receipts, Transactions, +}; +use std::task::{ready, Context, Poll}; use crate::NodeId; use reth_eth_wire::capability::CapabilityMessage; @@ -38,9 +43,51 @@ impl From for RequestError { } } +/// Represents all messages that can be sent to a peer session +#[derive(Debug)] +pub enum PeerMessage { + /// Announce new block hashes + NewBlockHashes(NewBlockHashes), + /// Broadcast new block. + NewBlock(Box), + /// Broadcast transactions. + Transactions(Transactions), + /// All `eth` request variants. + EthRequest(PeerRequest), + /// Other than eth namespace message + Other(CapabilityMessage), +} + +/// All Request variants of an [`EthMessage`] +/// +/// Note: These variants come without a request ID, as it's expected that the peer session will +/// manage those +#[derive(Debug, Clone)] +#[allow(missing_docs)] +#[allow(clippy::enum_variant_names)] +pub enum EthRequest { + GetBlockHeaders(GetBlockHeaders), + GetBlockBodies(GetBlockBodies), + GetPooledTransactions(GetPooledTransactions), + GetNodeData(GetNodeData), + GetReceipts(GetReceipts), +} + +/// Corresponding Response variants for [`EthRequest`] +#[derive(Debug, Clone)] +#[allow(missing_docs)] +pub enum EthResponse { + BlockHeaders(BlockHeaders), + BlockBodies(BlockBodies), + PooledTransactions(PooledTransactions), + NodeData(NodeData), + Receipts(Receipts), +} + /// Protocol related request messages that expect a response #[derive(Debug)] -pub enum CapabilityRequest { +#[allow(clippy::enum_variant_names)] +pub enum PeerRequest { /// Request Block headers from the peer. /// /// The response should be sent through the channel. @@ -48,19 +95,80 @@ pub enum CapabilityRequest { request: GetBlockHeaders, response: oneshot::Sender>, }, + /// Request Block headers from the peer. + /// + /// The response should be sent through the channel. + GetBlockBodies { + request: GetBlockHeaders, + response: oneshot::Sender>, + }, + /// Request pooled transactions from the peer. + /// + /// The response should be sent through the channel. + GetPooledTransactions { + request: GetPooledTransactions, + response: oneshot::Sender>, + }, + /// Request NodeData from the peer. + /// + /// The response should be sent through the channel. + GetNodeData { request: GetNodeData, response: oneshot::Sender> }, + /// Request Receipts from the peer. + /// + /// The response should be sent through the channel. + GetReceipts { request: GetReceipts, response: oneshot::Sender> }, } -/// The actual response object +/// Corresponding variant for [`PeerRequest`]. #[derive(Debug)] -pub enum CapabilityResponse { - GetBlockHeaders(RequestResult), +pub enum PeerResponse { + BlockHeaders { response: oneshot::Receiver> }, + BlockBodies { response: oneshot::Receiver> }, + PooledTransactions { response: oneshot::Receiver> }, + NodeData { response: oneshot::Receiver> }, + Receipts { response: oneshot::Receiver> }, } -/// A Cloneable connection for sending messages directly to the session of a peer. +// === impl PeerResponse === + +impl PeerResponse { + /// Polls the type to completion. + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { + macro_rules! poll_request { + ($response:ident, $item:ident, $cx:ident) => { + match ready!($response.poll_unpin($cx)) { + Ok(res) => res.map(EthResponse::$item), + Err(err) => Err(err.into()), + } + }; + } + + let res = match self { + PeerResponse::BlockHeaders { response } => { + poll_request!(response, BlockHeaders, cx) + } + PeerResponse::BlockBodies { response } => { + poll_request!(response, BlockBodies, cx) + } + PeerResponse::PooledTransactions { response } => { + poll_request!(response, PooledTransactions, cx) + } + PeerResponse::NodeData { response } => { + poll_request!(response, NodeData, cx) + } + PeerResponse::Receipts { response } => { + poll_request!(response, Receipts, cx) + } + }; + Poll::Ready(res) + } +} + +/// A Cloneable connection for sending _requests_ directly to the session of a peer. #[derive(Debug, Clone)] -pub struct PeerMessageSender { +pub struct PeerRequestSender { /// id of the remote node. pub(crate) peer: NodeId, /// The Sender half connected to a session. - pub(crate) to_session_tx: mpsc::Sender, + pub(crate) to_session_tx: mpsc::Sender, } diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index f0ee4da31..91db0ef2c 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -1,13 +1,13 @@ //! Represents an established session. use crate::{ + message::PeerRequest, session::{ handle::{ActiveSessionMessage, SessionCommand}, SessionId, }, NodeId, }; - use fnv::FnvHashMap; use futures::{stream::Fuse, Sink, Stream}; use pin_project::pin_project; @@ -27,6 +27,8 @@ use tokio_stream::wrappers::ReceiverStream; /// node or read from connection) and emitting events back to the [`SessionHandler`]. #[pin_project] pub(crate) struct ActiveSession { + /// Keeps track of request ids. + pub(crate) next_id: usize, /// The underlying connection. #[pin] pub(crate) conn: ECIESStream, @@ -43,9 +45,9 @@ pub(crate) struct ActiveSession { pub(crate) to_session: mpsc::Sender, /// Incoming request to send to delegate to the remote peer. #[pin] - pub(crate) messages_rx: Fuse>, + pub(crate) request_tx: Fuse>, /// All requests currently in progress. - pub(crate) inflight_requests: FnvHashMap, + pub(crate) inflight_requests: FnvHashMap, /// Buffered messages that should be sent to the remote peer. pub(crate) buffered_outgoing: VecDeque, } @@ -76,7 +78,7 @@ impl Future for ActiveSession { } } - while let Poll::Ready(Some(_msg)) = this.messages_rx.as_mut().poll_next(cx) { + while let Poll::Ready(Some(_req)) = this.request_tx.as_mut().poll_next(cx) { progress = true; // TODO handle request } diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index b60f2527e..aa92fab76 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -1,5 +1,5 @@ //! Support for handling peer sessions. -pub use crate::message::PeerMessageSender; +pub use crate::message::PeerRequestSender; use crate::{ session::{ active::ActiveSession, @@ -234,15 +234,16 @@ impl SessionManager { let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer); - let messages = PeerMessageSender { peer: node_id, to_session_tx }; + let messages = PeerRequestSender { peer: node_id, to_session_tx }; let session = ActiveSession { + next_id: 0, remote_node_id: node_id, remote_capabilities: Arc::clone(&capabilities), session_id, commands_rx: ReceiverStream::new(commands_rx), to_session: self.active_session_tx.clone(), - messages_rx: ReceiverStream::new(messages_rx).fuse(), + request_tx: ReceiverStream::new(messages_rx).fuse(), inflight_requests: Default::default(), conn, buffered_outgoing: Default::default(), @@ -379,7 +380,7 @@ pub(crate) enum SessionEvent { remote_addr: SocketAddr, capabilities: Arc, status: Status, - messages: PeerMessageSender, + messages: PeerRequestSender, }, /// A session received a valid message via RLPx. ValidMessage { diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 7c8c8843a..12c1036ad 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -3,13 +3,11 @@ use crate::{ discovery::{Discovery, DiscoveryEvent}, fetch::StateFetcher, - message::CapabilityResponse, + message::{EthResponse, PeerRequestSender, PeerResponse}, peers::{PeerAction, PeersManager}, NodeId, }; -use futures::FutureExt; -use crate::message::PeerMessageSender; use reth_eth_wire::{capability::Capabilities, Status}; use reth_interfaces::provider::BlockProvider; use reth_primitives::{H256, U256}; @@ -20,7 +18,7 @@ use std::{ task::{Context, Poll}, time::Instant, }; -use tokio::sync::oneshot; + use tracing::trace; /// The [`NetworkState`] keeps track of the state of all peers in the network. @@ -76,7 +74,7 @@ where _node_id: NodeId, _capabilities: Arc, _status: Status, - _messages: PeerMessageSender, + _messages: PeerRequestSender, ) { // TODO notify fetecher as well } @@ -119,7 +117,7 @@ where fn disconnect_session(&mut self, _node: NodeId) {} /// Invoked when received a response from a connected peer. - fn on_response(&mut self, _node: NodeId, _resp: CapabilityResponse) {} + fn on_eth_response(&mut self, _node: NodeId, _resp: EthResponse) {} /// Advances the state pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { @@ -138,7 +136,7 @@ where // poll all connected peers for responses for (id, peer) in self.connected_peers.iter_mut() { if let Some(response) = peer.pending_response.as_mut() { - match response.poll_unpin(cx) { + match response.poll(cx) { Poll::Ready(Ok(resp)) => received_responses.push((*id, resp)), Poll::Ready(Err(_)) => { trace!( @@ -161,7 +159,7 @@ where } for (id, resp) in received_responses { - self.on_response(id, resp); + self.on_eth_response(id, resp); } // poll peer manager @@ -185,9 +183,9 @@ pub struct ConnectedPeer { /// Best block number of the peer. pub(crate) best_number: U256, /// A communication channel directly to the session service. - pub(crate) message_tx: PeerMessageSender, + pub(crate) message_tx: PeerRequestSender, /// The response receiver for a currently active request to that peer. - pub(crate) pending_response: Option>, + pub(crate) pending_response: Option, } /// Tracks the current state of the peer session @@ -201,12 +199,12 @@ pub enum PeerSessionState { Incoming { /// How long to keep this open. until: Instant, - sender: PeerMessageSender, + sender: PeerRequestSender, }, /// Node is connected to the peer and is ready to Ready { /// Communication channel directly to the session task - sender: PeerMessageSender, + sender: PeerRequestSender, }, }