feat: add missing message handling (#200)

* feat: add missing message handling

* refactor: new block message handling

* feat: add events and commands for transaction handling

* more work in transactions

* chore: silence warnings
This commit is contained in:
Matthias Seitz
2022-11-15 10:34:28 +01:00
committed by GitHub
parent 92a7818512
commit b60ced1de1
16 changed files with 655 additions and 129 deletions

10
Cargo.lock generated
View File

@ -2155,6 +2155,15 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "linked_hash_set"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47186c6da4d81ca383c7c47c1bfc80f4b95f4720514d860a5407aaf4233f9588"
dependencies = [
"linked-hash-map",
]
[[package]]
name = "lock_api"
version = "0.4.9"
@ -3302,6 +3311,7 @@ dependencies = [
"either",
"fnv",
"futures",
"linked_hash_set",
"parking_lot 0.12.1",
"pin-project",
"rand 0.8.5",

View File

@ -11,6 +11,8 @@ pub enum RequestError {
ChannelClosed,
#[error("Not connected to the peer.")]
NotConnected,
#[error("Connection to a peer dropped while handling the request.")]
ConnectionDropped,
#[error("Capability Message is not supported by remote peer.")]
UnsupportedCapability,
#[error("Request timed out while awaiting response.")]

View File

@ -35,6 +35,7 @@ parking_lot = "0.12"
async-trait = "0.1"
bytes = "1.2"
either = "1.8"
linked_hash_set = "0.1"
secp256k1 = { version = "0.24", features = [
"global-context",

View File

@ -0,0 +1,58 @@
use linked_hash_set::LinkedHashSet;
use std::{borrow::Borrow, hash::Hash, num::NonZeroUsize};
/// A minimal LRU cache based on a `LinkedHashSet` with limited capacity.
///
/// If the length exceeds the set capacity, the oldest element will be removed
/// In the limit, for each element inserted the oldest existing element will be removed.
#[derive(Debug, Clone)]
pub struct LruCache<T: Hash + Eq> {
limit: NonZeroUsize,
inner: LinkedHashSet<T>,
}
impl<T: Hash + Eq> LruCache<T> {
/// Creates a new `LruCache` using the given limit
pub fn new(limit: NonZeroUsize) -> Self {
Self { inner: LinkedHashSet::new(), limit }
}
/// Insert an element into the set.
///
/// If the element is new (did not exist before [`LruCache::insert()`]) was called, then the
/// given length will be enforced and the oldest element will be removed if the limit was
/// exceeded.
///
/// If the set did not have this value present, true is returned.
/// If the set did have this value present, false is returned.
pub fn insert(&mut self, entry: T) -> bool {
if self.inner.insert(entry) {
if self.limit.get() == self.inner.len() {
// remove the oldest element in the set
self.inner.pop_front();
}
return true
}
false
}
/// Returns `true` if the set contains a value.
pub fn contains<Q: ?Sized>(&self, value: &Q) -> bool
where
T: Borrow<Q>,
Q: Hash + Eq,
{
self.inner.contains(value)
}
}
impl<T> Extend<T> for LruCache<T>
where
T: Eq + Hash,
{
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
for item in iter.into_iter() {
self.insert(item);
}
}
}

View File

@ -1,4 +1,8 @@
use crate::{peers::PeersConfig, session::SessionsConfig};
use crate::{
import::{BlockImport, NoopBlockImport},
peers::PeersConfig,
session::SessionsConfig,
};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_PORT};
use reth_primitives::{Chain, ForkId, H256};
use secp256k1::SecretKey;
@ -30,6 +34,8 @@ pub struct NetworkConfig<C> {
pub chain: Chain,
/// Genesis hash of the network
pub genesis_hash: H256,
/// The block importer type.
pub block_import: Box<dyn BlockImport>,
}
// === impl NetworkConfig ===
@ -82,6 +88,8 @@ pub struct NetworkConfigBuilder<C> {
chain: Chain,
/// Network genesis hash
genesis_hash: H256,
/// The block importer type.
block_import: Box<dyn BlockImport>,
}
// === impl NetworkConfigBuilder ===
@ -100,6 +108,7 @@ impl<C> NetworkConfigBuilder<C> {
fork_id: None,
chain: Chain::Named(reth_primitives::rpc::Chain::Mainnet),
genesis_hash: Default::default(),
block_import: Box::<NoopBlockImport>::default(),
}
}
@ -109,6 +118,12 @@ impl<C> NetworkConfigBuilder<C> {
self
}
/// Sets the [`BlockImport`] type to configure.
pub fn block_import<T: BlockImport + 'static>(mut self, block_import: T) -> Self {
self.block_import = Box::new(block_import);
self
}
/// Consumes the type and creates the actual [`NetworkConfig`]
pub fn build(self) -> NetworkConfig<C> {
let Self {
@ -122,6 +137,7 @@ impl<C> NetworkConfigBuilder<C> {
fork_id,
chain,
genesis_hash,
block_import,
} = self;
NetworkConfig {
client,
@ -138,6 +154,7 @@ impl<C> NetworkConfigBuilder<C> {
fork_id,
chain,
genesis_hash,
block_import,
}
}
}

View File

@ -1,10 +1,13 @@
//! Fetch data from the network.
use crate::message::BlockRequest;
use crate::{message::BlockRequest, peers::ReputationChange};
use futures::StreamExt;
use reth_eth_wire::{BlockBody, EthMessage};
use reth_interfaces::p2p::{error::RequestResult, headers::client::HeadersRequest};
use reth_primitives::{Header, PeerId, H256, U256};
use reth_eth_wire::{BlockBody, GetBlockBodies};
use reth_interfaces::p2p::{
error::{RequestError, RequestResult},
headers::client::HeadersRequest,
};
use reth_primitives::{Header, PeerId, H256};
use std::{
collections::{HashMap, VecDeque},
task::{Context, Poll},
@ -20,6 +23,8 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
pub struct StateFetcher {
/// Currently active [`GetBlockHeaders`] requests
inflight_headers_requests: HashMap<PeerId, Request<HeadersRequest, RequestResult<Vec<Header>>>>,
/// Currently active [`GetBlockBodies`] requests
inflight_bodies_requests: HashMap<PeerId, Request<Vec<H256>, RequestResult<Vec<BlockBody>>>>,
/// The list of available peers for requests.
peers: HashMap<PeerId, Peer>,
/// Requests queued for processing
@ -34,26 +39,55 @@ pub struct StateFetcher {
impl StateFetcher {
/// Invoked when connected to a new peer.
pub(crate) fn new_connected_peer(&mut self, _node_id: PeerId, _best_hash: H256) {}
pub(crate) fn new_connected_peer(
&mut self,
peer_id: PeerId,
best_hash: H256,
best_number: Option<u64>,
) {
self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number });
}
/// Invoked when an active session was closed.
pub(crate) fn on_session_closed(&mut self, _peer: &PeerId) {}
///
/// This cancels als inflight request and sends an error to the receiver.
pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
self.peers.remove(peer);
if let Some(req) = self.inflight_headers_requests.remove(peer) {
let _ = req.response.send(Err(RequestError::ConnectionDropped));
}
if let Some(req) = self.inflight_bodies_requests.remove(peer) {
let _ = req.response.send(Err(RequestError::ConnectionDropped));
}
}
/// Invoked when an active session is about to be disconnected.
pub(crate) fn on_pending_disconnect(&mut self, _peer: &PeerId) {}
pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.peers.get_mut(peer_id) {
peer.state = PeerState::Closing;
}
}
/// Returns the next idle peer that's ready to accept a request
fn next_peer(&mut self) -> Option<(&PeerId, &mut Peer)> {
self.peers.iter_mut().find(|(_, peer)| peer.state.is_idle())
}
/// Returns the next action to return
fn poll_action(&mut self) -> Option<FetchAction> {
// TODO find matching peers
// if let Some(request) = self.queued_requests.pop_front() {
// if let Some(action) = self.on_download_request(request) {
// return Poll::Ready(action)
// }
// }
None
if self.queued_requests.is_empty() {
return None
}
let peer_id = *self.next_peer()?.0;
let request = self.queued_requests.pop_front().expect("not empty; qed");
let request = self.prepare_block_request(peer_id, request);
Some(FetchAction::BlockRequest { peer_id, request })
}
/// Received a request via a downloader
fn on_download_request(&mut self, request: DownloadRequest) -> Option<FetchAction> {
match request {
DownloadRequest::GetBlockHeaders { request: _, response: _ } => {}
@ -91,21 +125,79 @@ impl StateFetcher {
Poll::Pending
}
/// Handles a new request to a peer.
///
/// Caution: this assumes the peer exists and is idle
fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest) -> BlockRequest {
// update the peer's state
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.state = req.peer_state();
}
let started = Instant::now();
match req {
DownloadRequest::GetBlockHeaders { request, response } => {
let inflight = Request { request, response, started };
self.inflight_headers_requests.insert(peer_id, inflight);
unimplemented!("unify start types");
// BlockRequest::GetBlockHeaders(GetBlockHeaders {
// // TODO: this should be converted
// start_block: BlockHashOrNumber::Number(0),
// limit: request.limit,
// skip: 0,
// reverse: request.reverse,
// })
}
DownloadRequest::GetBlockBodies { request, response } => {
let inflight = Request { request: request.clone(), response, started };
self.inflight_bodies_requests.insert(peer_id, inflight);
BlockRequest::GetBlockBodies(GetBlockBodies(request))
}
}
}
/// Returns a new followup request for the peer.
///
/// Caution: this expects that the peer is _not_ closed
fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
let req = self.queued_requests.pop_front()?;
let req = self.prepare_block_request(peer_id, req);
Some(BlockResponseOutcome::Request(peer_id, req))
}
/// Called on a `GetBlockHeaders` response from a peer
pub(crate) fn on_block_headers_response(
&mut self,
_peer: PeerId,
_res: RequestResult<Vec<Header>>,
peer_id: PeerId,
res: RequestResult<Vec<Header>>,
) -> Option<BlockResponseOutcome> {
if let Some(resp) = self.inflight_headers_requests.remove(&peer_id) {
let _ = resp.response.send(res);
}
if let Some(peer) = self.peers.get_mut(&peer_id) {
if peer.state.on_request_finished() {
return self.followup_request(peer_id)
}
}
None
}
/// Called on a `GetBlockBodies` response from a peer
pub(crate) fn on_block_bodies_response(
&mut self,
_peer: PeerId,
_res: RequestResult<Vec<BlockBody>>,
peer_id: PeerId,
res: RequestResult<Vec<BlockBody>>,
) -> Option<BlockResponseOutcome> {
if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
let _ = resp.response.send(res);
}
if let Some(peer) = self.peers.get_mut(&peer_id) {
if peer.state.on_request_finished() {
return self.followup_request(peer_id)
}
}
None
}
@ -120,6 +212,7 @@ impl Default for StateFetcher {
let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
Self {
inflight_headers_requests: Default::default(),
inflight_bodies_requests: Default::default(),
peers: Default::default(),
queued_requests: Default::default(),
download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
@ -148,14 +241,12 @@ impl HeadersDownloader {
/// Represents a connected peer
struct Peer {
/// Identifier for requests.
request_id: u64,
/// The state this peer currently resides in.
state: PeerState,
/// Best known hash that the peer has
best_hash: H256,
/// Best known number the peer has.
best_number: U256,
/// Tracks the best number of the peer.
best_number: Option<u64>,
}
/// Tracks the state of an individual peer
@ -164,6 +255,32 @@ enum PeerState {
Idle,
/// Peer is handling a `GetBlockHeaders` request.
GetBlockHeaders,
/// Peer is handling a `GetBlockBodies` request.
GetBlockBodies,
/// Peer session is about to close
Closing,
}
// === impl PeerState ===
impl PeerState {
/// Returns true if the peer is currently idle.
fn is_idle(&self) -> bool {
matches!(self, PeerState::Idle)
}
/// Resets the state on a received response.
///
/// If the state was already marked as `Closing` do nothing.
///
/// Returns `true` if the peer is ready for another request.
fn on_request_finished(&mut self) -> bool {
if !matches!(self, PeerState::Closing) {
*self = PeerState::Idle;
return true
}
false
}
}
/// A request that waits for a response from the network so it can send it back through the response
@ -185,13 +302,26 @@ enum DownloadRequest {
GetBlockBodies { request: Vec<H256>, response: oneshot::Sender<RequestResult<Vec<BlockBody>>> },
}
// === impl DownloadRequest ===
impl DownloadRequest {
/// Returns the corresponding state for a peer that handles the request.
fn peer_state(&self) -> PeerState {
match self {
DownloadRequest::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
DownloadRequest::GetBlockBodies { .. } => PeerState::GetBlockBodies,
}
}
}
/// An action the syncer can emit.
pub(crate) enum FetchAction {
/// Dispatch an eth request to the given peer.
EthRequest {
node_id: PeerId,
BlockRequest {
/// The targeted recipient for the request
peer_id: PeerId,
/// The request to send
request: EthMessage,
request: BlockRequest,
},
}
@ -202,7 +332,6 @@ pub(crate) enum FetchAction {
pub(crate) enum BlockResponseOutcome {
/// Continue with another request to the peer.
Request(PeerId, BlockRequest),
/// How to handle a bad response
// TODO this should include some form of reputation change
BadResponse(PeerId),
/// How to handle a bad response and the reputation change to apply.
BadResponse(PeerId, ReputationChange),
}

View File

@ -0,0 +1,42 @@
use crate::message::NewBlockMessage;
use reth_primitives::PeerId;
use std::task::{Context, Poll};
/// Abstraction over block import.
pub trait BlockImport: Send + Sync {
/// Invoked for a received `NewBlock` broadcast message from the peer.
///
/// > When a `NewBlock` announcement message is received from a peer, the client first verifies
/// > the basic header validity of the block, checking whether the proof-of-work value is valid.
///
/// This is supposed to start verification. The results are then expected to be returned via
/// [`BlockImport::poll`].
fn on_new_block(&mut self, peer_id: PeerId, incoming_block: NewBlockMessage);
/// Returns the results of a [`BlockImport::on_new_block`]
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BlockImportOutcome>;
}
/// Outcome of the [`BlockImport`]'s block handling.
pub struct BlockImportOutcome {
/// Sender of the `NewBlock` message.
pub peer: PeerId,
/// The result after validating the block
pub result: Result<NewBlockMessage, BlockImportError>,
}
/// Represents the error case of a failed block import
pub enum BlockImportError {}
/// An implementation of `BlockImport` that does nothing
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct NoopBlockImport;
impl BlockImport for NoopBlockImport {
fn on_new_block(&mut self, _peer_id: PeerId, _incoming_block: NewBlockMessage) {}
fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<BlockImportOutcome> {
Poll::Pending
}
}

View File

@ -15,10 +15,12 @@
//! port of that network. This includes public identities (public key) and addresses (where to reach
//! them).
mod cache;
mod config;
mod discovery;
pub mod error;
mod fetch;
mod import;
mod listener;
mod manager;
mod message;

View File

@ -19,7 +19,9 @@ use crate::{
config::NetworkConfig,
discovery::Discovery,
error::NetworkError,
import::{BlockImport, BlockImportOutcome},
listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender},
network::{NetworkHandle, NetworkHandleMessage},
peers::PeersManager,
session::SessionManager,
@ -30,9 +32,9 @@ use futures::{Future, StreamExt};
use parking_lot::Mutex;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
EthMessage,
GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions,
};
use reth_interfaces::provider::BlockProvider;
use reth_interfaces::{p2p::error::RequestResult, provider::BlockProvider};
use reth_primitives::PeerId;
use std::{
net::SocketAddr,
@ -43,7 +45,7 @@ use std::{
},
task::{Context, Poll},
};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{error, trace};
@ -77,8 +79,8 @@ pub struct NetworkManager<C> {
handle: NetworkHandle,
/// Receiver half of the command channel set up between this type and the [`NetworkHandle`]
from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage>,
/// Handles block imports.
block_import_sink: (),
/// Handles block imports according to the `eth` protocol.
block_import: Box<dyn BlockImport>,
/// The address of this node that listens for incoming connections.
listener_address: Arc<Mutex<SocketAddr>>,
/// All listeners for [`Network`] events.
@ -112,6 +114,7 @@ where
peers_config,
sessions_config,
genesis_hash,
block_import,
..
} = config;
@ -145,7 +148,7 @@ where
swarm,
handle,
from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
block_import_sink: (),
block_import,
listener_address,
event_listeners: Default::default(),
num_active_peers,
@ -171,41 +174,47 @@ where
// TODO: disconnect?
}
/// Handles a received [`CapabilityMessage`] from the peer.
fn on_capability_message(&mut self, _node_id: PeerId, msg: CapabilityMessage) {
match msg {
CapabilityMessage::Eth(eth) => {
match eth {
EthMessage::Status(_) => {}
EthMessage::NewBlockHashes(_) => {
// update peer's state, to track what blocks this peer has seen
/// Handle an incoming request from the peer
fn on_eth_request(&mut self, peer_id: PeerId, req: PeerRequest) {
match req {
PeerRequest::GetBlockHeaders { .. } => {}
PeerRequest::GetBlockBodies { .. } => {}
PeerRequest::GetPooledTransactions { request, response } => {
// notify listeners about this request
self.event_listeners.send(NetworkEvent::GetPooledTransactions {
peer_id,
request,
response: Arc::new(response),
});
}
EthMessage::NewBlock(_) => {
// emit new block and track that the peer knows this block
PeerRequest::GetNodeData { .. } => {}
PeerRequest::GetReceipts { .. } => {}
}
EthMessage::Transactions(_) => {
// need to emit this as event/send to tx handler
}
EthMessage::NewPooledTransactionHashes(_) => {
// need to emit this as event/send to tx handler
}
// TODO: should remove the response types here, as they are handled separately
EthMessage::GetBlockHeaders(_) => {}
EthMessage::BlockHeaders(_) => {}
EthMessage::GetBlockBodies(_) => {}
EthMessage::BlockBodies(_) => {}
EthMessage::GetPooledTransactions(_) => {}
EthMessage::PooledTransactions(_) => {}
EthMessage::GetNodeData(_) => {}
EthMessage::NodeData(_) => {}
EthMessage::GetReceipts(_) => {}
EthMessage::Receipts(_) => {}
/// Handles a received Message from the peer.
fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage) {
match msg {
PeerMessage::NewBlockHashes(hashes) => {
// update peer's state, to track what blocks this peer has seen
self.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0)
}
PeerMessage::NewBlock(block) => {
self.swarm.state_mut().on_new_block(peer_id, block.hash);
// start block import process
self.block_import.on_new_block(peer_id, block);
}
CapabilityMessage::Other(_) => {
// other subprotocols
PeerMessage::PooledTransactions(msg) => {
self.event_listeners
.send(NetworkEvent::IncomingPooledTransactionHashes { peer_id, msg });
}
PeerMessage::Transactions(msg) => {
self.event_listeners.send(NetworkEvent::IncomingTransactions { peer_id, msg });
}
PeerMessage::EthRequest(req) => {
self.on_eth_request(peer_id, req);
}
PeerMessage::Other(_) => {}
}
}
@ -215,10 +224,25 @@ where
NetworkHandleMessage::EventListener(tx) => {
self.event_listeners.listeners.push(tx);
}
NetworkHandleMessage::NewestBlock(_, _) => {}
_ => {}
NetworkHandleMessage::AnnounceBlock(block, hash) => {
let msg = NewBlockMessage { hash, block: Arc::new(block) };
self.swarm.state_mut().announce_new_block(msg);
}
NetworkHandleMessage::EthRequest { peer_id, request } => {
self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
}
NetworkHandleMessage::SendTransaction { peer_id, msg } => {
self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::Transactions(msg))
}
NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
.swarm
.sessions_mut()
.send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
}
}
/// Invoked after a `NewBlock` message from the peer was validated
fn on_block_import_result(&mut self, _outcome: BlockImportOutcome) {}
}
impl<C> Future for NetworkManager<C>
@ -230,6 +254,11 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// poll new block imports
while let Poll::Ready(outcome) = this.block_import.poll(cx) {
this.on_block_import_result(outcome);
}
// process incoming messages from a handle
loop {
match this.from_handle_rx.poll_next_unpin(cx) {
@ -248,8 +277,8 @@ where
while let Poll::Ready(Some(event)) = this.swarm.poll_next_unpin(cx) {
// handle event
match event {
SwarmEvent::CapabilityMessage { node_id, message } => {
this.on_capability_message(node_id, message)
SwarmEvent::ValidMessage { node_id, message } => {
this.on_peer_message(node_id, message)
}
SwarmEvent::InvalidCapabilityMessage { node_id, capabilities, message } => {
this.on_invalid_message(node_id, capabilities, message)
@ -266,25 +295,38 @@ where
SwarmEvent::OutgoingTcpConnection { remote_addr } => {
trace!(?remote_addr, target = "net", "Starting outbound connection.");
}
SwarmEvent::SessionEstablished { node_id, remote_addr } => {
SwarmEvent::SessionEstablished {
node_id: peer_id,
remote_addr,
capabilities,
messages,
} => {
let total_active = this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
trace!(
?remote_addr,
?node_id,
?peer_id,
?total_active,
target = "net",
"Session established"
);
this.event_listeners.send(NetworkEvent::SessionEstablished {
peer_id,
capabilities,
messages,
});
}
SwarmEvent::SessionClosed { node_id, remote_addr } => {
SwarmEvent::SessionClosed { node_id: peer_id, remote_addr } => {
let total_active = this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
trace!(
?remote_addr,
?node_id,
?peer_id,
?total_active,
target = "net",
"Session disconnected"
);
this.event_listeners.send(NetworkEvent::SessionClosed { peer_id });
}
SwarmEvent::IncomingPendingSessionClosed { .. } => {}
SwarmEvent::OutgoingPendingSessionClosed { .. } => {}
@ -292,14 +334,33 @@ where
}
}
todo!()
Poll::Pending
}
}
/// Events emitted by the network that are of interest for subscribers.
///
/// This includes any event types that may be relevant to tasks
#[derive(Debug, Clone)]
pub enum NetworkEvent {
EthMessage { node_id: PeerId, message: EthMessage },
/// Closed the peer session.
SessionClosed { peer_id: PeerId },
/// Established a new session with the given peer.
SessionEstablished {
peer_id: PeerId,
capabilities: Arc<Capabilities>,
messages: PeerRequestSender,
},
/// Received list of transactions to the given peer.
IncomingTransactions { peer_id: PeerId, msg: Arc<Transactions> },
/// Received list of transactions hashes to the given peer.
IncomingPooledTransactionHashes { peer_id: PeerId, msg: Arc<NewPooledTransactionHashes> },
/// Incoming `GetPooledTransactions` request from a peer.
GetPooledTransactions {
peer_id: PeerId,
request: GetPooledTransactions,
response: Arc<oneshot::Sender<RequestResult<PooledTransactions>>>,
},
}
/// Bundles all listeners for [`NetworkEvent`]s.

View File

@ -5,26 +5,38 @@
use futures::FutureExt;
use reth_eth_wire::{
BlockBodies, BlockBody, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetPooledTransactions, GetReceipts, NewBlock, NewBlockHashes, NodeData, PooledTransactions,
Receipts, Transactions,
capability::CapabilityMessage, BlockBodies, BlockBody, BlockHeaders, GetBlockBodies,
GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewBlockHashes,
NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, Transactions,
};
use std::task::{ready, Context, Poll};
use reth_eth_wire::capability::CapabilityMessage;
use reth_interfaces::p2p::error::RequestResult;
use reth_primitives::{Header, PeerId, Receipt, TransactionSigned};
use reth_primitives::{Header, PeerId, Receipt, TransactionSigned, H256};
use std::{
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot};
/// Internal form of a `NewBlock` message
#[derive(Debug, Clone)]
pub struct NewBlockMessage {
/// Hash of the block
pub hash: H256,
/// Raw received message
pub block: Arc<NewBlock>,
}
/// Represents all messages that can be sent to a peer session
#[derive(Debug)]
pub enum PeerMessage {
/// Announce new block hashes
NewBlockHashes(NewBlockHashes),
/// Broadcast new block.
NewBlock(Box<NewBlock>),
NewBlock(NewBlockMessage),
/// Broadcast transactions.
Transactions(Transactions),
Transactions(Arc<Transactions>),
///
PooledTransactions(Arc<NewPooledTransactionHashes>),
/// All `eth` request variants.
EthRequest(PeerRequest),
/// Other than eth namespace message

View File

@ -1,6 +1,7 @@
use crate::{manager::NetworkEvent, peers::PeersHandle};
use crate::{manager::NetworkEvent, message::PeerRequest, peers::PeersHandle};
use parking_lot::Mutex;
use reth_primitives::{PeerId, H256, U256};
use reth_eth_wire::{NewBlock, NewPooledTransactionHashes, Transactions};
use reth_primitives::{PeerId, H256};
use std::{
net::SocketAddr,
sync::{atomic::AtomicUsize, Arc},
@ -47,6 +48,16 @@ impl NetworkHandle {
let _ = self.manager().send(NetworkHandleMessage::EventListener(tx));
rx
}
/// Sends a [`NetworkHandleMessage`] to the manager
fn send_message(&self, msg: NetworkHandleMessage) {
let _ = self.inner.to_manager_tx.send(msg);
}
/// Sends a [`PeerRequest`] to the given peer's session.
pub fn send_request(&mut self, peer_id: PeerId, request: PeerRequest) {
self.send_message(NetworkHandleMessage::EthRequest { peer_id, request })
}
}
struct NetworkInner {
@ -59,15 +70,25 @@ struct NetworkInner {
/// The identifier used by this node.
local_node_id: PeerId,
/// Access to the all the nodes
peers: PeersHandle, // TODO need something to access
peers: PeersHandle,
}
/// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager).
#[allow(missing_docs)]
pub(crate) enum NetworkHandleMessage {
/// Add a new listener for [`NetworkEvent`].
EventListener(UnboundedSender<NetworkEvent>),
/// Broadcast event to announce a new block to all nodes.
AnnounceBlock,
/// Returns the newest imported block by the network.
NewestBlock(H256, U256),
AnnounceBlock(NewBlock, H256),
/// Sends the list of transactions to the given peer.
SendTransaction { peer_id: PeerId, msg: Arc<Transactions> },
/// Sends the list of transactions hashes to the given peer.
SendPooledTransactionHashes { peer_id: PeerId, msg: Arc<NewPooledTransactionHashes> },
/// Send an `eth` protocol request to the peer.
EthRequest {
/// The peer to send the request to.
peer_id: PeerId,
/// The request to send to the peer's sessions.
request: PeerRequest,
},
}

View File

@ -1,5 +1,8 @@
//! Session handles
use crate::session::{Direction, SessionId};
use crate::{
message::PeerMessage,
session::{Direction, SessionId},
};
use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
@ -93,7 +96,8 @@ pub(crate) enum PendingSessionEvent {
pub(crate) enum SessionCommand {
/// Disconnect the connection
Disconnect,
Message(CapabilityMessage),
/// Sends a message to the peer
Message(PeerMessage),
}
/// Message variants an active session can produce and send back to the
@ -107,7 +111,7 @@ pub(crate) enum ActiveSessionMessage {
/// Identifier of the remote peer.
node_id: PeerId,
/// Message received from the peer.
message: CapabilityMessage,
message: PeerMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidMessage {

View File

@ -1,9 +1,13 @@
//! Support for handling peer sessions.
pub use crate::message::PeerRequestSender;
use crate::session::{
use crate::{
message::PeerMessage,
session::{
active::ActiveSession,
handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
SessionCommand,
},
},
};
use fnv::FnvHashMap;
@ -194,6 +198,13 @@ impl SessionManager {
}
}
/// Sends a message to the peer's session
pub(crate) fn send_message(&mut self, peer_id: &PeerId, msg: PeerMessage) {
if let Some(session) = self.active_sessions.get_mut(peer_id) {
let _ = session.commands_to_session.try_send(SessionCommand::Message(msg));
}
}
/// This polls all the session handles and returns [`SessionEvent`].
///
/// Active sessions are prioritized.
@ -406,7 +417,7 @@ pub(crate) enum SessionEvent {
ValidMessage {
node_id: PeerId,
/// Message received from the peer.
message: CapabilityMessage,
message: PeerMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidMessage {

View File

@ -1,29 +1,31 @@
//! Keeps track of the state of the network.
use crate::{
cache::LruCache,
discovery::{Discovery, DiscoveryEvent},
fetch::StateFetcher,
message::{PeerRequestSender, PeerResponse},
fetch::{BlockResponseOutcome, StateFetcher},
message::{
BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse,
PeerResponseResult,
},
peers::{PeerAction, PeersManager},
};
use reth_eth_wire::{capability::Capabilities, Status};
use reth_eth_wire::{capability::Capabilities, BlockHashNumber, Status};
use reth_interfaces::provider::BlockProvider;
use reth_primitives::{PeerId, H256};
use std::{
collections::{HashMap, VecDeque},
net::SocketAddr,
num::NonZeroUsize,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::oneshot;
use crate::{
fetch::BlockResponseOutcome,
message::{BlockRequest, PeerRequest, PeerResponseResult},
};
use tracing::trace;
/// Cache limit of blocks to keep track of for a single peer.
const PEER_BLOCK_CACHE_LIMIT: usize = 512;
/// The [`NetworkState`] keeps track of the state of all peers in the network.
///
/// This includes:
@ -90,7 +92,9 @@ where
// TODO add capacity check
debug_assert!(self.connected_peers.contains_key(&peer), "Already connected; not possible");
self.state_fetcher.new_connected_peer(peer, status.blockhash);
// find the corresponding block number
let block_number = self.client.block_number(status.blockhash).ok().flatten();
self.state_fetcher.new_connected_peer(peer, status.blockhash, block_number);
self.connected_peers.insert(
peer,
@ -99,6 +103,7 @@ where
capabilities,
request_tx,
pending_response: None,
blocks: LruCache::new(NonZeroUsize::new(PEER_BLOCK_CACHE_LIMIT).unwrap()),
},
);
@ -111,12 +116,59 @@ where
self.state_fetcher.on_session_closed(&peer);
}
/// Propagates Block to peers.
pub(crate) fn announce_block(&mut self, _hash: H256, _block: ()) {
// TODO propagate the newblock messages to all connected peers that haven't seen the block
// yet
/// Starts propagating the new block to peers that haven't reported the block yet.
///
/// This is supposed to be invoked after the block was validated.
///
/// > It then sends the block to a small fraction of connected peers (usually the square root of
/// > the total number of peers) using the `NewBlock` message.
///
/// See also <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>
pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage) {
// send a `NewBlock` message to a fraction fo the connected peers (square root of the total
// number of peers)
let num_propagate = (self.connected_peers.len() as f64).sqrt() as u64 + 1;
todo!()
let mut count = 0;
for (peer_id, peer) in self.connected_peers.iter_mut() {
if peer.blocks.contains(&msg.hash) {
// skip peers which already reported the block
continue
}
// Queue a `NewBlock` message for the peer
if count < num_propagate {
self.queued_messages
.push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });
// mark the block as seen by the peer
peer.blocks.insert(msg.hash);
count += 1;
}
if count >= num_propagate {
break
}
}
}
/// Invoked after a `NewBlock` message was received by the peer.
///
/// This will keep track of blocks we know a peer has
pub(crate) fn on_new_block(&mut self, peer_id: PeerId, hash: H256) {
// Mark the blocks as seen
if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
peer.blocks.insert(hash);
}
}
/// Invoked for a `NewBlockHashes` broadcast message.
pub(crate) fn on_new_block_hashes(&mut self, peer_id: PeerId, hashes: Vec<BlockHashNumber>) {
// Mark the blocks as seen
if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
peer.blocks.extend(hashes.into_iter().map(|b| b.hash));
}
}
/// Event hook for events received from the discovery service.
@ -183,7 +235,7 @@ where
BlockResponseOutcome::Request(peer, request) => {
self.handle_block_request(peer, request);
}
BlockResponseOutcome::BadResponse(_) => {
BlockResponseOutcome::BadResponse(_peer, _reputation_change) => {
// TODO handle reputation change
}
}
@ -277,10 +329,19 @@ pub struct ConnectedPeer {
pub(crate) request_tx: PeerRequestSender,
/// The response receiver for a currently active request to that peer.
pub(crate) pending_response: Option<PeerResponse>,
/// Blocks we know the peer has.
pub(crate) blocks: LruCache<H256>,
}
/// Message variants triggered by the [`State`]
pub enum StateAction {
/// Dispatch a `NewBlock` message to the peer
NewBlock {
/// Target of the message
peer_id: PeerId,
/// The `NewBlock` message
block: NewBlockMessage,
},
/// Create a new connection to the given node.
Connect { remote_addr: SocketAddr, node_id: PeerId },
/// Disconnect an existing connection

View File

@ -1,5 +1,6 @@
use crate::{
listener::{ConnectionListener, ListenerEvent},
message::{PeerMessage, PeerRequestSender},
session::{SessionEvent, SessionId, SessionManager},
state::{AddSessionError, NetworkState, StateAction},
};
@ -56,6 +57,11 @@ where
&mut self.state
}
/// Mutable access to the [`SessionManager`].
pub(crate) fn sessions_mut(&mut self) -> &mut SessionManager {
&mut self.sessions
}
/// Triggers a new outgoing connection to the given node
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
self.sessions.dial_outbound(remote_addr, remote_id)
@ -70,8 +76,18 @@ where
capabilities,
status,
messages,
} => match self.state.on_session_activated(node_id, capabilities, status, messages) {
Ok(_) => Some(SwarmEvent::SessionEstablished { node_id, remote_addr }),
} => match self.state.on_session_activated(
node_id,
capabilities.clone(),
status,
messages.clone(),
) {
Ok(_) => Some(SwarmEvent::SessionEstablished {
node_id,
remote_addr,
capabilities,
messages,
}),
Err(err) => {
match err {
AddSessionError::AtCapacity { peer } => self.sessions.disconnect(peer),
@ -80,7 +96,7 @@ where
}
},
SessionEvent::ValidMessage { node_id, message } => {
Some(SwarmEvent::CapabilityMessage { node_id, message })
Some(SwarmEvent::ValidMessage { node_id, message })
}
SessionEvent::InvalidMessage { node_id, capabilities, message } => {
Some(SwarmEvent::InvalidCapabilityMessage { node_id, capabilities, message })
@ -133,6 +149,10 @@ where
StateAction::Disconnect { node_id } => {
self.sessions.disconnect(node_id);
}
StateAction::NewBlock { peer_id, block: msg } => {
let msg = PeerMessage::NewBlock(msg);
self.sessions.send_message(&peer_id, msg);
}
}
None
}
@ -191,11 +211,11 @@ where
/// network.
pub enum SwarmEvent {
/// Events related to the actual network protocol.
CapabilityMessage {
ValidMessage {
/// The peer that sent the message
node_id: PeerId,
/// Message received from the peer
message: CapabilityMessage,
message: PeerMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidCapabilityMessage {
@ -230,6 +250,8 @@ pub enum SwarmEvent {
SessionEstablished {
node_id: PeerId,
remote_addr: SocketAddr,
capabilities: Arc<Capabilities>,
messages: PeerRequestSender,
},
SessionClosed {
node_id: PeerId,

View File

@ -1,12 +1,25 @@
//! Transaction management for the p2p network.
use crate::{manager::NetworkEvent, NetworkHandle};
use reth_primitives::{Transaction, H256};
use crate::{cache::LruCache, manager::NetworkEvent, message::PeerRequestSender, NetworkHandle};
use futures::stream::FuturesUnordered;
use reth_primitives::{PeerId, Transaction, H256};
use reth_transaction_pool::TransactionPool;
use std::collections::HashMap;
use std::{
collections::{hash_map::Entry, HashMap},
future::Future,
num::NonZeroUsize,
pin::Pin,
sync::Arc,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
/// Cache limit of transactions to keep track of for a single peer.
const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024;
/// The future for inserting a function into the pool
pub type PoolImportFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
/// Api to interact with [`TransactionsManager`] task.
pub struct TransactionsHandle {
/// Command channel to the [`TransactionsManager`]
@ -39,10 +52,15 @@ pub struct TransactionsManager<Pool> {
///
/// From which we get all new incoming transaction related messages.
network_events: UnboundedReceiverStream<NetworkEvent>,
/// All currently pending transactions
pending_transactions: (),
/// All the peers that have sent the same transactions.
peers: HashMap<H256, Vec<()>>,
/// All currently pending transactions grouped by peers.
///
/// This way we can track incoming transactions and prevent multiple pool imports for the same
/// transaction
transactions_by_peers: HashMap<H256, Vec<PeerId>>,
/// Transactions that are currently imported into the `Pool`
pool_imports: FuturesUnordered<PoolImportFuture>,
/// All the connected peers.
peers: HashMap<PeerId, Peer>,
/// Send half for the command channel.
command_tx: mpsc::UnboundedSender<TransactionsCommand>,
/// Incoming commands from [`TransactionsHandle`].
@ -64,7 +82,8 @@ where
pool,
network,
network_events: UnboundedReceiverStream::new(network_events),
pending_transactions: (),
transactions_by_peers: Default::default(),
pool_imports: Default::default(),
peers: Default::default(),
command_tx,
command_rx: UnboundedReceiverStream::new(command_rx),
@ -76,10 +95,64 @@ where
TransactionsHandle { manager_tx: self.command_tx.clone() }
}
/// Handles a received event
async fn on_event(&mut self, event: NetworkEvent) {
match event {
NetworkEvent::SessionClosed { peer_id } => {
// remove the peer
self.peers.remove(&peer_id);
}
NetworkEvent::SessionEstablished { peer_id, messages, .. } => {
// insert a new peer
self.peers.insert(
peer_id,
Peer {
transactions: LruCache::new(
NonZeroUsize::new(PEER_TRANSACTION_CACHE_LIMIT).unwrap(),
),
request_tx: messages,
},
);
// TODO send `NewPooledTransactionHashes
}
NetworkEvent::IncomingTransactions { peer_id, msg } => {
let transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone());
if let Some(peer) = self.peers.get_mut(&peer_id) {
for tx in transactions.0 {
// track that the peer knows this transaction
peer.transactions.insert(tx.hash);
match self.transactions_by_peers.entry(tx.hash) {
Entry::Occupied(mut entry) => {
// transaction was already inserted
entry.get_mut().push(peer_id);
}
Entry::Vacant(_) => {
// TODO import into the pool
}
}
}
}
}
NetworkEvent::IncomingPooledTransactionHashes { .. } => {}
NetworkEvent::GetPooledTransactions { .. } => {}
}
}
/// Executes an endless future
pub async fn run(self) {}
}
/// Tracks a single peer
struct Peer {
/// Keeps track of transactions that we know the peer has seen.
transactions: LruCache<H256>,
/// A communication channel directly to the session task.
request_tx: PeerRequestSender,
}
/// Commands to send to the [`TransactionManager`]
enum TransactionsCommand {
Propagate(H256),