feat(net): more request handling (#186)

* feat(net): more request handling

* rm uneccessary checks

* rm box
This commit is contained in:
Matthias Seitz
2022-11-10 21:24:47 +01:00
committed by GitHub
parent dd4878c0e3
commit c43d752929
7 changed files with 247 additions and 75 deletions

View File

@ -72,6 +72,12 @@ impl Capabilities {
self.inner
}
/// Whether the peer supports `eth` sub-protocol.
#[inline]
pub fn supports_eth(&self) -> bool {
self.eth_67 || self.eth_66
}
/// Whether this peer supports eth v66 protocol.
#[inline]
pub fn supports_eth_v66(&self) -> bool {

View File

@ -1,7 +1,7 @@
use crate::{peers::PeersConfig, session::SessionsConfig};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_PORT};
use reth_eth_wire::forkid::ForkId;
use reth_primitives::Chain;
use reth_primitives::{Chain, H256};
use secp256k1::SecretKey;
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
@ -29,6 +29,8 @@ pub struct NetworkConfig<C> {
pub fork_id: Option<ForkId>,
/// The id of the network
pub chain: Chain,
/// Genesis hash of the network
pub genesis_hash: H256,
}
// === impl NetworkConfig ===
@ -58,6 +60,7 @@ impl<C> NetworkConfig<C> {
}
/// Builder for [`NetworkConfig`](struct.NetworkConfig.html).
#[allow(missing_docs)]
pub struct NetworkConfigBuilder<C> {
/// The client type that can interact with the chain.
client: Arc<C>,
@ -73,10 +76,9 @@ pub struct NetworkConfigBuilder<C> {
peers_config: Option<PeersConfig>,
/// How to configure the sessions manager
sessions_config: Option<SessionsConfig>,
fork_id: Option<ForkId>,
chain: Chain,
genesis_hash: H256,
}
// === impl NetworkConfigBuilder ===
@ -94,9 +96,16 @@ impl<C> NetworkConfigBuilder<C> {
sessions_config: None,
fork_id: None,
chain: Chain::Named(reth_primitives::rpc::Chain::Mainnet),
genesis_hash: Default::default(),
}
}
/// Sets the genesis hash for the network.
pub fn genesis_hash(mut self, genesis_hash: H256) -> Self {
self.genesis_hash = genesis_hash;
self
}
/// Consumes the type and creates the actual [`NetworkConfig`]
pub fn build(self) -> NetworkConfig<C> {
let Self {
@ -109,6 +118,7 @@ impl<C> NetworkConfigBuilder<C> {
sessions_config,
fork_id,
chain,
genesis_hash,
} = self;
NetworkConfig {
client,
@ -124,6 +134,7 @@ impl<C> NetworkConfigBuilder<C> {
sessions_config: sessions_config.unwrap_or_default(),
fork_id,
chain,
genesis_hash,
}
}
}

View File

@ -1,6 +1,9 @@
//! Fetch data from the network.
use crate::{message::RequestResult, NodeId};
use crate::{
message::{BlockRequest, RequestResult},
NodeId,
};
use futures::StreamExt;
use reth_eth_wire::{BlockBody, EthMessage};
use reth_interfaces::p2p::headers::client::HeadersRequest;
@ -34,13 +37,13 @@ pub struct StateFetcher {
impl StateFetcher {
/// Invoked when connected to a new peer.
pub(crate) fn new_connected_peer(
&mut self,
_node_id: NodeId,
_best_hash: H256,
_best_number: U256,
) {
}
pub(crate) fn new_connected_peer(&mut self, _node_id: NodeId, _best_hash: H256) {}
/// Invoked when an active session was closed.
pub(crate) fn on_session_closed(&mut self, _peer: &NodeId) {}
/// Invoked when an active session is about to be disconnected.
pub(crate) fn on_pending_disconnect(&mut self, _peer: &NodeId) {}
/// Returns the next action to return
fn poll_action(&mut self) -> Option<FetchAction> {
@ -94,9 +97,19 @@ impl StateFetcher {
/// Called on a `GetBlockHeaders` response from a peer
pub(crate) fn on_block_headers_response(
&mut self,
_from: NodeId,
_msg: RequestResult<Vec<Header>>,
) {
_peer: NodeId,
_res: RequestResult<Vec<Header>>,
) -> Option<BlockResponseOutcome> {
None
}
/// Called on a `GetBlockBodies` response from a peer
pub(crate) fn on_block_bodies_response(
&mut self,
_peer: NodeId,
_res: RequestResult<Vec<BlockBody>>,
) -> Option<BlockResponseOutcome> {
None
}
/// Returns a new [`HeadersDownloader`] that can send requests to this type
@ -184,3 +197,15 @@ pub(crate) enum FetchAction {
request: EthMessage,
},
}
/// Outcome of a processed response.
///
/// Returned after processing a response.
#[derive(Debug)]
pub(crate) enum BlockResponseOutcome {
/// Continue with another request to the peer.
Request(NodeId, BlockRequest),
/// How to handle a bad response
// TODO this should include some form of reputation change
BadResponse(NodeId),
}

View File

@ -111,6 +111,7 @@ where
listener_addr,
peers_config,
sessions_config,
genesis_hash,
..
} = config;
@ -125,7 +126,7 @@ where
let local_node_id = discovery.local_id();
let sessions = SessionManager::new(secret_key, sessions_config);
let state = NetworkState::new(client, discovery, peers_manger);
let state = NetworkState::new(client, discovery, peers_manger, genesis_hash);
let swarm = Swarm::new(incoming, sessions, state);

View File

@ -5,14 +5,16 @@
use futures::FutureExt;
use reth_eth_wire::{
BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions,
GetReceipts, NewBlock, NewBlockHashes, NodeData, PooledTransactions, Receipts, Transactions,
BlockBodies, BlockBody, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetPooledTransactions, GetReceipts, NewBlock, NewBlockHashes, NodeData, PooledTransactions,
Receipts, Transactions,
};
use std::task::{ready, Context, Poll};
use crate::NodeId;
use reth_eth_wire::capability::CapabilityMessage;
use tokio::sync::{mpsc, oneshot};
use reth_primitives::{Header, Receipt, TransactionSigned};
use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot};
/// Result alias for result of a request.
pub type RequestResult<T> = Result<T, RequestError>;
@ -58,6 +60,15 @@ pub enum PeerMessage {
Other(CapabilityMessage),
}
/// Request Variants that only target block related data.
#[derive(Debug, Clone)]
#[allow(missing_docs)]
#[allow(clippy::enum_variant_names)]
pub enum BlockRequest {
GetBlockHeaders(GetBlockHeaders),
GetBlockBodies(GetBlockBodies),
}
/// All Request variants of an [`EthMessage`]
///
/// Note: These variants come without a request ID, as it's expected that the peer session will
@ -99,8 +110,8 @@ pub enum PeerRequest {
///
/// The response should be sent through the channel.
GetBlockBodies {
request: GetBlockHeaders,
response: oneshot::Sender<RequestResult<BlockHeaders>>,
request: GetBlockBodies,
response: oneshot::Sender<RequestResult<BlockBodies>>,
},
/// Request pooled transactions from the peer.
///
@ -133,12 +144,15 @@ pub enum PeerResponse {
impl PeerResponse {
/// Polls the type to completion.
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestResult<EthResponse>> {
pub(crate) fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<PeerResponseResult, oneshot::error::RecvError>> {
macro_rules! poll_request {
($response:ident, $item:ident, $cx:ident) => {
match ready!($response.poll_unpin($cx)) {
Ok(res) => res.map(EthResponse::$item),
Err(err) => Err(err.into()),
Ok(res) => Ok(PeerResponseResult::$item(res.map(|item| item.0))),
Err(err) => Err(err),
}
};
}
@ -164,6 +178,32 @@ impl PeerResponse {
}
}
/// All response variants for [`PeerResponse`]
#[derive(Debug)]
#[allow(missing_docs)]
pub enum PeerResponseResult {
BlockHeaders(RequestResult<Vec<Header>>),
BlockBodies(RequestResult<Vec<BlockBody>>),
PooledTransactions(RequestResult<Vec<TransactionSigned>>),
NodeData(RequestResult<Vec<bytes::Bytes>>),
Receipts(RequestResult<Vec<Vec<Receipt>>>),
}
// === impl PeerResponseResult ===
impl PeerResponseResult {
/// Returns whether this result is an error.
pub fn is_err(&self) -> bool {
match self {
PeerResponseResult::BlockHeaders(res) => res.is_err(),
PeerResponseResult::BlockBodies(res) => res.is_err(),
PeerResponseResult::PooledTransactions(res) => res.is_err(),
PeerResponseResult::NodeData(res) => res.is_err(),
PeerResponseResult::Receipts(res) => res.is_err(),
}
}
}
/// A Cloneable connection for sending _requests_ directly to the session of a peer.
#[derive(Debug, Clone)]
pub struct PeerRequestSender {
@ -172,3 +212,12 @@ pub struct PeerRequestSender {
/// The Sender half connected to a session.
pub(crate) to_session_tx: mpsc::Sender<PeerRequest>,
}
// === impl PeerRequestSender ===
impl PeerRequestSender {
/// Attempts to immediately send a message on this Sender
pub fn try_send(&self, req: PeerRequest) -> Result<(), TrySendError<PeerRequest>> {
self.to_session_tx.try_send(req)
}
}

View File

@ -3,22 +3,26 @@
use crate::{
discovery::{Discovery, DiscoveryEvent},
fetch::StateFetcher,
message::{EthResponse, PeerRequestSender, PeerResponse},
message::{PeerRequestSender, PeerResponse},
peers::{PeerAction, PeersManager},
NodeId,
};
use reth_eth_wire::{capability::Capabilities, Status};
use reth_interfaces::provider::BlockProvider;
use reth_primitives::{H256, U256};
use reth_primitives::H256;
use std::{
collections::{HashMap, VecDeque},
net::SocketAddr,
sync::Arc,
task::{Context, Poll},
time::Instant,
};
use tokio::sync::oneshot;
use crate::{
fetch::BlockResponseOutcome,
message::{BlockRequest, PeerRequest, PeerResponseResult},
};
use tracing::trace;
/// The [`NetworkState`] keeps track of the state of all peers in the network.
@ -36,14 +40,14 @@ pub struct NetworkState<C> {
connected_peers: HashMap<NodeId, ConnectedPeer>,
/// Manages connections to peers.
peers_manager: PeersManager,
/// Tracks the state of connected peers
peers_state: HashMap<NodeId, PeerSessionState>,
/// Buffered messages until polled.
queued_messages: VecDeque<StateAction>,
/// The client type that can interact with the chain.
client: Arc<C>,
/// Network discovery.
discovery: Discovery,
/// The genesis hash of the network we're on
genesis_hash: H256,
/// The type that handles requests.
///
/// The fetcher streams RLPx related requests on a per-peer basis to this type. This type will
@ -56,31 +60,57 @@ where
C: BlockProvider,
{
/// Create a new state instance with the given params
pub(crate) fn new(client: Arc<C>, discovery: Discovery, peers_manager: PeersManager) -> Self {
pub(crate) fn new(
client: Arc<C>,
discovery: Discovery,
peers_manager: PeersManager,
genesis_hash: H256,
) -> Self {
Self {
connected_peers: Default::default(),
peers_manager,
peers_state: Default::default(),
queued_messages: Default::default(),
client,
discovery,
genesis_hash,
state_fetcher: Default::default(),
}
}
/// Event hook for an authenticated session for the peer.
pub(crate) fn on_session_authenticated(
/// Event hook for an activated session for the peer.
///
/// Returns `Ok` if the session is valid, returns an `Err` if the session is not accepted and
/// should be rejected.
pub(crate) fn on_session_activated(
&mut self,
_node_id: NodeId,
_capabilities: Arc<Capabilities>,
_status: Status,
_messages: PeerRequestSender,
) {
// TODO notify fetecher as well
peer: NodeId,
capabilities: Arc<Capabilities>,
status: Status,
request_tx: PeerRequestSender,
) -> Result<(), AddSessionError> {
// TODO add capacity check
debug_assert!(self.connected_peers.contains_key(&peer), "Already connected; not possible");
self.state_fetcher.new_connected_peer(peer, status.blockhash);
self.connected_peers.insert(
peer,
ConnectedPeer {
best_hash: status.blockhash,
capabilities,
request_tx,
pending_response: None,
},
);
Ok(())
}
/// Event hook for a disconnected session for the peer.
pub(crate) fn on_session_closed(&mut self, _node_id: NodeId) {}
pub(crate) fn on_session_closed(&mut self, peer: NodeId) {
self.connected_peers.remove(&peer);
self.state_fetcher.on_session_closed(&peer);
}
/// Propagates Block to peers.
pub(crate) fn announce_block(&mut self, _hash: H256, _block: ()) {
@ -103,21 +133,75 @@ where
fn on_peer_action(&mut self, action: PeerAction) {
match action {
PeerAction::Connect { node_id, remote_addr } => {
self.peers_state.insert(node_id, PeerSessionState::Connecting);
self.queued_messages.push_back(StateAction::Connect { node_id, remote_addr });
}
PeerAction::Disconnect { node_id } => {
self.peers_state.remove(&node_id);
self.state_fetcher.on_pending_disconnect(&node_id);
self.queued_messages.push_back(StateAction::Disconnect { node_id });
}
}
}
/// Disconnect the session
fn disconnect_session(&mut self, _node: NodeId) {}
fn on_session_disconnected(&mut self, peer: NodeId) {
self.connected_peers.remove(&peer);
}
/// Sends The message to the peer's session and queues in a response.
///
/// Caution: this will replace an already pending response. It's the responsibility of the
/// caller to select the peer.
fn handle_block_request(&mut self, peer: NodeId, request: BlockRequest) {
if let Some(ref mut peer) = self.connected_peers.get_mut(&peer) {
let (request, response) = match request {
BlockRequest::GetBlockHeaders(request) => {
let (response, rx) = oneshot::channel();
let request = PeerRequest::GetBlockHeaders { request, response };
let response = PeerResponse::BlockHeaders { response: rx };
(request, response)
}
BlockRequest::GetBlockBodies(request) => {
let (response, rx) = oneshot::channel();
let request = PeerRequest::GetBlockBodies { request, response };
let response = PeerResponse::BlockBodies { response: rx };
(request, response)
}
};
let _ = peer.request_tx.to_session_tx.try_send(request);
peer.pending_response = Some(response);
}
}
/// Handle the outcome of processed response, for example directly queue another request.
fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) -> Option<StateAction> {
match outcome {
BlockResponseOutcome::Request(peer, request) => {
self.handle_block_request(peer, request);
}
BlockResponseOutcome::BadResponse(_) => {
// TODO handle reputation change
}
}
None
}
/// Invoked when received a response from a connected peer.
fn on_eth_response(&mut self, _node: NodeId, _resp: EthResponse) {}
fn on_eth_response(&mut self, peer: NodeId, resp: PeerResponseResult) -> Option<StateAction> {
match resp {
PeerResponseResult::BlockHeaders(res) => {
let outcome = self.state_fetcher.on_block_headers_response(peer, res)?;
return self.on_block_response_outcome(outcome)
}
PeerResponseResult::BlockBodies(res) => {
let outcome = self.state_fetcher.on_block_bodies_response(peer, res)?;
return self.on_block_response_outcome(outcome)
}
PeerResponseResult::PooledTransactions(_) => {}
PeerResponseResult::NodeData(_) => {}
PeerResponseResult::Receipts(_) => {}
}
None
}
/// Advances the state
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction> {
@ -137,7 +221,6 @@ where
for (id, peer) in self.connected_peers.iter_mut() {
if let Some(response) = peer.pending_response.as_mut() {
match response.poll(cx) {
Poll::Ready(Ok(resp)) => received_responses.push((*id, resp)),
Poll::Ready(Err(_)) => {
trace!(
?id,
@ -146,6 +229,7 @@ where
);
disconnect_sessions.push(*id);
}
Poll::Ready(Ok(resp)) => received_responses.push((*id, resp)),
Poll::Pending => continue,
};
}
@ -155,11 +239,13 @@ where
}
for node in disconnect_sessions {
self.disconnect_session(node)
self.on_session_disconnected(node)
}
for (id, resp) in received_responses {
self.on_eth_response(id, resp);
if let Some(action) = self.on_eth_response(id, resp) {
self.queued_messages.push_back(action);
}
}
// poll peer manager
@ -180,34 +266,14 @@ where
pub struct ConnectedPeer {
/// Best block of the peer.
pub(crate) best_hash: H256,
/// Best block number of the peer.
pub(crate) best_number: U256,
/// A communication channel directly to the session service.
pub(crate) message_tx: PeerRequestSender,
/// The capabilities of the connected peer.
pub(crate) capabilities: Arc<Capabilities>,
/// A communication channel directly to the session task.
pub(crate) request_tx: PeerRequestSender,
/// The response receiver for a currently active request to that peer.
pub(crate) pending_response: Option<PeerResponse>,
}
/// Tracks the current state of the peer session
pub enum PeerSessionState {
/// Starting state for outbound connections.
///
/// This will be triggered by a [`PeerAction::Connect`] action.
/// The peer will reside in the state until the connection has been authenticated.
Connecting,
/// Established connection that hasn't been authenticated yet.
Incoming {
/// How long to keep this open.
until: Instant,
sender: PeerRequestSender,
},
/// Node is connected to the peer and is ready to
Ready {
/// Communication channel directly to the session task
sender: PeerRequestSender,
},
}
/// Message variants triggered by the [`State`]
pub enum StateAction {
/// Create a new connection to the given node.
@ -215,3 +281,12 @@ pub enum StateAction {
/// Disconnect an existing connection
Disconnect { node_id: NodeId },
}
#[derive(Debug, thiserror::Error)]
pub enum AddSessionError {
#[error("No capacity for new sessions")]
AtCapacity {
/// The peer of the session
peer: NodeId,
},
}

View File

@ -1,7 +1,7 @@
use crate::{
listener::{ConnectionListener, ListenerEvent},
session::{SessionEvent, SessionId, SessionManager},
state::{NetworkState, StateAction},
state::{AddSessionError, NetworkState, StateAction},
NodeId,
};
use futures::Stream;
@ -68,10 +68,15 @@ where
capabilities,
status,
messages,
} => {
self.state.on_session_authenticated(node_id, capabilities, status, messages);
Some(SwarmEvent::SessionEstablished { node_id, remote_addr })
} => match self.state.on_session_activated(node_id, capabilities, status, messages) {
Ok(_) => Some(SwarmEvent::SessionEstablished { node_id, remote_addr }),
Err(err) => {
match err {
AddSessionError::AtCapacity { peer } => self.sessions.disconnect(peer),
};
None
}
},
SessionEvent::ValidMessage { node_id, message } => {
Some(SwarmEvent::CapabilityMessage { node_id, message })
}