feat(net): add eth request handler (#293)

* feat(net): add block request manager

* refactor: change ommers type to Header

* feat: impl handlers

* rename to EthRequestHandler

* impl checks

* fix: skip as step

* chore: rustfmt
This commit is contained in:
Matthias Seitz
2022-11-30 21:19:50 +01:00
committed by GitHub
parent d0e9e6fe80
commit 480097a7ca
9 changed files with 366 additions and 22 deletions

View File

@ -16,7 +16,7 @@ use tokio_stream::Stream;
/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message.
// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50
const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
/// An un-authenticated [`EthStream`]. This is consumed and returns a [`EthStream`] after the
/// `Status` handshake is completed.

View File

@ -22,6 +22,6 @@ pub mod types;
pub use types::*;
pub use crate::{
ethstream::{EthStream, UnauthedEthStream},
ethstream::{EthStream, UnauthedEthStream, MAX_MESSAGE_SIZE},
p2pstream::{DisconnectReason, HelloMessage, P2PStream, ProtocolVersion, UnauthedP2PStream},
};

View File

@ -13,7 +13,7 @@ use reth_rlp::{RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrap
///
/// If the [`skip`](#structfield.skip) field is non-zero, the peer must skip that amount of headers
/// in the direction specified by [`reverse`](#structfield.reverse).
#[derive(Copy, Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, RlpEncodable, RlpDecodable)]
pub struct GetBlockHeaders {
/// The block number or hash that the peer should start returning headers from.
pub start_block: BlockHashOrNumber,
@ -94,8 +94,6 @@ impl From<Vec<BlockBody>> for BlockBodies {
#[cfg(test)]
mod test {
use std::str::FromStr;
use crate::types::{
message::RequestPair, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
};
@ -105,6 +103,7 @@ mod test {
TxLegacy, U256,
};
use reth_rlp::{Decodable, Encodable};
use std::str::FromStr;
use super::BlockBody;

View File

@ -29,13 +29,21 @@ impl<T: Hash + Eq> LruCache<T> {
if self.inner.insert(entry) {
if self.limit.get() == self.inner.len() {
// remove the oldest element in the set
self.inner.pop_front();
self.remove_lru();
}
return true
}
false
}
/// Remove the least recently used entry and return it.
///
/// If the `LruCache` is empty this will return None.
#[inline]
fn remove_lru(&mut self) {
self.inner.pop_front();
}
/// Returns `true` if the set contains a value.
pub fn contains<Q: ?Sized>(&self, value: &Q) -> bool
where

View File

@ -0,0 +1,296 @@
//! Blocks/Headers management for the p2p network.
use crate::peers::PeersHandle;
use futures::StreamExt;
use reth_eth_wire::{
BlockBodies, BlockBody, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetReceipts, NodeData, Receipts,
};
use reth_interfaces::{
p2p::error::RequestResult,
provider::{BlockProvider, HeaderProvider},
};
use reth_primitives::{BlockHashOrNumber, Header, PeerId};
use std::{
borrow::Borrow,
future::Future,
hash::Hash,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>
/// Maximum number of block headers to serve.
///
/// Used to limit lookups.
const MAX_HEADERS_SERVE: usize = 1024;
/// Maximum number of block headers to serve.
///
/// Used to limit lookups. With 24KB block sizes nowadays, the practical limit will always be
/// SOFT_RESPONSE_LIMIT.
const MAX_BODIES_SERVE: usize = 1024;
/// Estimated size in bytes of an RLP encoded body.
// TODO: check 24kb blocksize assumption
const APPROX_BODY_SIZE: usize = 24 * 1024;
/// Maximum size of replies to data retrievals.
const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
/// Estimated size in bytes of an RLP encoded header.
const APPROX_HEADER_SIZE: usize = 500;
/// Manages eth related requests on top of the p2p network.
///
/// This can be spawned to another task and is supposed to be run as background service.
#[must_use = "Manager does nothing unless polled."]
pub struct EthRequestHandler<C> {
/// The client type that can interact with the chain.
client: Arc<C>,
/// Used for reporting peers.
#[allow(unused)]
// TODO use to report spammers
peers: PeersHandle,
/// Incoming request from the [`NetworkManager`].
incoming_requests: UnboundedReceiverStream<IncomingEthRequest>,
}
// === impl EthRequestHandler ===
impl<C> EthRequestHandler<C>
where
C: BlockProvider + HeaderProvider,
{
/// Create a new instance
pub fn new(
client: Arc<C>,
peers: PeersHandle,
incoming: UnboundedReceiver<IncomingEthRequest>,
) -> Self {
Self { client, peers, incoming_requests: UnboundedReceiverStream::new(incoming) }
}
/// Returns the list of requested heders
fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<Header> {
let GetBlockHeaders { start_block, limit, skip, reverse } = request;
let direction = HeadersDirection::new(reverse);
let mut headers = Vec::new();
let mut block: BlockHashOrNumber = match start_block {
BlockHashOrNumber::Hash(start) => start.into(),
BlockHashOrNumber::Number(num) => {
if let Some(hash) = self.client.block_hash(num.into()).unwrap_or_default() {
hash.into()
} else {
return headers
}
}
};
let skip = skip as u64;
let mut total_bytes = APPROX_HEADER_SIZE;
for _ in 0..limit {
if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
match direction {
HeadersDirection::Rising => {
if let Some(next) = (header.number + 1).checked_add(skip) {
block = next.into()
} else {
break
}
}
HeadersDirection::Falling => {
if skip > 0 {
// prevent under flows for block.number == 0 and `block.number - skip <
// 0`
if let Some(next) =
header.number.checked_sub(1).and_then(|num| num.checked_sub(skip))
{
block = next.into()
} else {
break
}
} else {
block = header.parent_hash.into()
}
}
}
headers.push(header);
if headers.len() >= MAX_HEADERS_SERVE {
break
}
total_bytes += APPROX_HEADER_SIZE;
if total_bytes > SOFT_RESPONSE_LIMIT {
break
}
} else {
break
}
}
headers
}
fn on_headers_request(
&mut self,
_peer_id: PeerId,
request: GetBlockHeaders,
response: oneshot::Sender<RequestResult<BlockHeaders>>,
) {
let headers = self.get_headers_response(request);
let _ = response.send(Ok(BlockHeaders(headers)));
}
fn on_bodies_request(
&mut self,
_peer_id: PeerId,
request: GetBlockBodies,
response: oneshot::Sender<RequestResult<BlockBodies>>,
) {
let mut bodies = Vec::new();
let mut total_bytes = APPROX_BODY_SIZE;
for hash in request.0 {
if let Some(block) = self.client.block(hash.into()).unwrap_or_default() {
let body = BlockBody { transactions: block.body, ommers: block.ommers };
bodies.push(body);
total_bytes += APPROX_BODY_SIZE;
if total_bytes > SOFT_RESPONSE_LIMIT {
break
}
if bodies.len() >= MAX_BODIES_SERVE {
break
}
} else {
break
}
}
let _ = response.send(Ok(BlockBodies(bodies)));
}
}
/// An endless future.
///
/// This should be spawned or used as part of `tokio::select!`.
impl<C> Future for EthRequestHandler<C>
where
C: BlockProvider + HeaderProvider,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match this.incoming_requests.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(incoming)) => match incoming {
IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
this.on_headers_request(peer_id, request, response)
}
IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
this.on_bodies_request(peer_id, request, response)
}
IncomingEthRequest::GetNodeData { .. } => {}
IncomingEthRequest::GetReceipts { .. } => {}
},
}
}
}
}
/// Represents the direction for a headers request depending on the `reverse` field of the request.
///
/// [`HeadersDirection::Rising`] block numbers for `reverse == true`
/// [`HeadersDirection::Falling`] block numbers for `reverse == false`
///
/// See also <https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getblockheaders-0x03>
#[derive(Copy, Clone)]
pub enum HeadersDirection {
/// Rising block number.
Rising,
/// Falling block number.
Falling,
}
impl HeadersDirection {
fn new(reverse: bool) -> Self {
if reverse {
HeadersDirection::Rising
} else {
HeadersDirection::Falling
}
}
}
/// Represents a handled [`GetBlockHeaders`] requests
///
/// This is the key type for spam detection cache. The counter is ignored during `PartialEq` and
/// `Hash`.
#[derive(Debug, PartialEq, Hash)]
#[allow(unused)]
struct RespondedGetBlockHeaders {
req: (PeerId, GetBlockHeaders),
}
impl Borrow<(PeerId, GetBlockHeaders)> for RespondedGetBlockHeaders {
fn borrow(&self) -> &(PeerId, GetBlockHeaders) {
&self.req
}
}
/// All `eth` request related to blocks delegated by the network.
#[derive(Debug)]
#[allow(missing_docs)]
pub enum IncomingEthRequest {
/// Request Block headers from the peer.
///
/// The response should be sent through the channel.
GetBlockHeaders {
peer_id: PeerId,
request: GetBlockHeaders,
response: oneshot::Sender<RequestResult<BlockHeaders>>,
},
/// Request Block headers from the peer.
///
/// The response should be sent through the channel.
GetBlockBodies {
peer_id: PeerId,
request: GetBlockBodies,
response: oneshot::Sender<RequestResult<BlockBodies>>,
},
/// Request Node Data from the peer.
///
/// The response should be sent through the channel.
GetNodeData {
peer_id: PeerId,
request: GetNodeData,
response: oneshot::Sender<RequestResult<NodeData>>,
},
/// Request Receipts from the peer.
///
/// The response should be sent through the channel.
GetReceipts {
peer_id: PeerId,
request: GetReceipts,
response: oneshot::Sender<RequestResult<Receipts>>,
},
}

View File

@ -17,6 +17,7 @@ mod cache;
mod config;
mod discovery;
pub mod error;
pub mod eth_requests;
mod fetch;
mod import;
mod listener;

View File

@ -19,6 +19,7 @@ use crate::{
config::NetworkConfig,
discovery::Discovery,
error::NetworkError,
eth_requests::IncomingEthRequest,
import::{BlockImport, BlockImportOutcome, BlockValidation},
listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender},
@ -88,7 +89,9 @@ pub struct NetworkManager<C> {
/// All listeners for [`Network`] events.
event_listeners: NetworkEventListeners,
/// Sender half to send events to the [`TransactionsManager`] task, if configured.
to_transactions: Option<mpsc::UnboundedSender<NetworkTransactionEvent>>,
to_transactions_manager: Option<mpsc::UnboundedSender<NetworkTransactionEvent>>,
/// Sender half to send events to the [`EthRequestHandler`] task, if configured.
to_eth_request_handler: Option<mpsc::UnboundedSender<IncomingEthRequest>>,
/// Tracks the number of active session (connected peers).
///
/// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
@ -154,14 +157,20 @@ where
from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
block_import,
event_listeners: Default::default(),
to_transactions: None,
to_transactions_manager: None,
to_eth_request_handler: None,
num_active_peers,
})
}
/// Sets the dedicated channel for events indented for the [`TransactionsManager`]
pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent>) {
self.to_transactions = Some(tx);
self.to_transactions_manager = Some(tx);
}
/// Sets the dedicated channel for events indented for the [`EthRequestHandler`]
pub fn set_eth_request_handler(&mut self, tx: mpsc::UnboundedSender<IncomingEthRequest>) {
self.to_eth_request_handler = Some(tx);
}
/// Returns the [`SocketAddr`] that listens for incoming connections.
@ -221,16 +230,49 @@ where
/// Sends an event to the [`TransactionsManager`] if configured
fn notify_tx_manager(&self, event: NetworkTransactionEvent) {
if let Some(ref tx) = self.to_transactions {
if let Some(ref tx) = self.to_transactions_manager {
let _ = tx.send(event);
}
}
/// Sends an event to the [`EthRequestManager`] if configured
fn delegate_eth_request(&self, event: IncomingEthRequest) {
if let Some(ref reqs) = self.to_eth_request_handler {
let _ = reqs.send(event);
}
}
/// Handle an incoming request from the peer
fn on_eth_request(&mut self, peer_id: PeerId, req: PeerRequest) {
match req {
PeerRequest::GetBlockHeaders { .. } => {}
PeerRequest::GetBlockBodies { .. } => {}
PeerRequest::GetBlockHeaders { request, response } => {
self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
peer_id,
request,
response,
})
}
PeerRequest::GetBlockBodies { request, response } => {
self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
peer_id,
request,
response,
})
}
PeerRequest::GetNodeData { request, response } => {
self.delegate_eth_request(IncomingEthRequest::GetNodeData {
peer_id,
request,
response,
})
}
PeerRequest::GetReceipts { request, response } => {
self.delegate_eth_request(IncomingEthRequest::GetReceipts {
peer_id,
request,
response,
})
}
PeerRequest::GetPooledTransactions { request, response } => {
self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
peer_id,
@ -238,8 +280,6 @@ where
response,
});
}
PeerRequest::GetNodeData { .. } => {}
PeerRequest::GetReceipts { .. } => {}
}
}

View File

@ -1,4 +1,4 @@
//! Transaction management for the p2p network.
//! Transactions management for the p2p network.
use crate::{
cache::LruCache,

View File

@ -46,7 +46,7 @@ impl Deref for BlockLocked {
}
/// Either a block hash _or_ a block number
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum BlockHashOrNumber {
/// A block hash
Hash(H256),
@ -68,18 +68,18 @@ impl From<u64> for BlockHashOrNumber {
/// Allows for RLP encoding of either a block hash or block number
impl Encodable for BlockHashOrNumber {
fn length(&self) -> usize {
match self {
Self::Hash(block_hash) => block_hash.length(),
Self::Number(block_number) => block_number.length(),
}
}
fn encode(&self, out: &mut dyn bytes::BufMut) {
match self {
Self::Hash(block_hash) => block_hash.encode(out),
Self::Number(block_number) => block_number.encode(out),
}
}
fn length(&self) -> usize {
match self {
Self::Hash(block_hash) => block_hash.length(),
Self::Number(block_number) => block_number.length(),
}
}
}
/// Allows for RLP decoding of a block hash or block number