chore(net): extract NetworkHandle methods for launching node to traits (#9966)

This commit is contained in:
Emilia Hane
2024-08-01 18:44:23 +02:00
committed by GitHub
parent f3fac56fd9
commit b10517b3bf
56 changed files with 985 additions and 776 deletions

View File

@ -1,12 +1,14 @@
//! Builder support for configuring the entire setup.
use reth_network_api::PeersHandleProvider;
use reth_transaction_pool::TransactionPool;
use tokio::sync::mpsc;
use crate::{
eth_requests::EthRequestHandler,
transactions::{TransactionsManager, TransactionsManagerConfig},
NetworkHandle, NetworkManager,
};
use reth_transaction_pool::TransactionPool;
use tokio::sync::mpsc;
/// We set the max channel capacity of the `EthRequestHandler` to 256
/// 256 requests with malicious 10MB body requests is 2.6GB which can be absorbed by the node.

View File

@ -1,11 +1,12 @@
//! Network cache support
use core::hash::BuildHasher;
use std::{fmt, hash::Hash};
use derive_more::{Deref, DerefMut};
use itertools::Itertools;
// use linked_hash_set::LinkedHashSet;
use schnellru::{ByLength, Limiter, RandomState, Unlimited};
use std::{fmt, hash::Hash};
/// A minimal LRU cache based on a [`LruMap`](schnellru::LruMap) with limited capacity.
///

View File

@ -1,11 +1,7 @@
//! Network config support
use crate::{
error::NetworkError,
import::{BlockImport, ProofOfStakeBlockImport},
transactions::TransactionsManagerConfig,
NetworkHandle, NetworkManager,
};
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
use reth_chainspec::{ChainSpec, MAINNET};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NatResolver, DEFAULT_DISCOVERY_ADDRESS};
use reth_discv5::NetworkStackId;
@ -17,7 +13,13 @@ use reth_primitives::{ForkFilter, Head};
use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use secp256k1::SECP256K1;
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
use crate::{
error::NetworkError,
import::{BlockImport, ProofOfStakeBlockImport},
transactions::TransactionsManagerConfig,
NetworkHandle, NetworkManager,
};
// re-export for convenience
use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocols};

View File

@ -1,11 +1,13 @@
//! Discovery support for the network.
use crate::{
cache::LruMap,
error::{NetworkError, ServiceKind},
manager::DiscoveredEvent,
peers::PeerAddr,
use std::{
collections::VecDeque,
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use enr::Enr;
use futures::StreamExt;
use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config};
@ -14,19 +16,19 @@ use reth_dns_discovery::{
DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::PeerAddr;
use reth_primitives::{EnrForkIdEntry, ForkId};
use secp256k1::SecretKey;
use std::{
collections::VecDeque,
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tracing::trace;
use crate::{
cache::LruMap,
error::{NetworkError, ServiceKind},
manager::DiscoveredEvent,
};
/// Default max capacity for cache of discovered peers.
///
/// Default is 10 000 peers.

View File

@ -1,13 +1,15 @@
//! Possible errors when interacting with the network.
use crate::session::PendingSessionHandshakeError;
use std::{fmt, io, io::ErrorKind, net::SocketAddr};
use reth_dns_discovery::resolver::ResolveError;
use reth_eth_wire::{
errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
DisconnectReason,
};
use reth_network_types::BackoffKind;
use std::{fmt, io, io::ErrorKind, net::SocketAddr};
use crate::session::PendingSessionHandshakeError;
/// Service kind.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]

View File

@ -1,9 +1,12 @@
//! Blocks/Headers management for the p2p network.
use crate::{
budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
metrics::EthRequestHandlerMetrics, peers::PeersHandle,
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use alloy_rlp::Encodable;
use futures::StreamExt;
use reth_eth_wire::{
@ -12,17 +15,17 @@ use reth_eth_wire::{
};
use reth_network_p2p::error::RequestResult;
use reth_network_peers::PeerId;
use reth_network_types::PeersHandle;
use reth_primitives::{BlockBody, BlockHashOrNumber, Header};
use reth_storage_api::{BlockReader, HeaderProvider, ReceiptProvider};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::sync::{mpsc::Receiver, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use crate::{
budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
metrics::EthRequestHandlerMetrics,
};
// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>
/// Maximum number of receipts to serve.

View File

@ -1,8 +1,11 @@
//! A client implementation that can interact with the network and download data.
use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse, peers::PeersHandle};
use futures::{future, future::Either};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use futures::{future, future::Either};
use reth_network_p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
@ -11,14 +14,12 @@ use reth_network_p2p::{
priority::Priority,
};
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use reth_network_types::{PeersHandle, ReputationChangeKind};
use reth_primitives::{Header, B256};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Front-end API for fetching data from the network.
///

View File

@ -1,16 +1,9 @@
//! Fetch data from the network.
use crate::{message::BlockRequest, peers::PeersHandle};
use futures::StreamExt;
use reth_eth_wire::{GetBlockBodies, GetBlockHeaders};
use reth_network_p2p::{
error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
headers::client::HeadersRequest,
priority::Priority,
};
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use reth_primitives::{BlockBody, Header, B256};
mod client;
pub use client::FetchClient;
use std::{
collections::{HashMap, VecDeque},
sync::{
@ -19,11 +12,21 @@ use std::{
},
task::{Context, Poll},
};
use futures::StreamExt;
use reth_eth_wire::{GetBlockBodies, GetBlockHeaders};
use reth_network_p2p::{
error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
headers::client::HeadersRequest,
priority::Priority,
};
use reth_network_peers::PeerId;
use reth_network_types::{PeersHandle, ReputationChangeKind};
use reth_primitives::{BlockBody, Header, B256};
use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
mod client;
pub use client::FetchClient;
use crate::message::BlockRequest;
/// Manages data fetching operations.
///

View File

@ -1,9 +1,10 @@
use futures::Future;
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::Future;
use pin_project::pin_project;
use tokio::sync::oneshot::{error::RecvError, Receiver};
/// Flatten a [Receiver] message in order to get rid of the [RecvError] result

View File

@ -1,9 +1,11 @@
//! This module provides an abstraction over block import in the form of the `BlockImport` trait.
use crate::message::NewBlockMessage;
use reth_network_peers::PeerId;
use std::task::{Context, Poll};
use reth_network_peers::PeerId;
use crate::message::NewBlockMessage;
/// Abstraction over block import.
pub trait BlockImport: std::fmt::Debug + Send + Sync {
/// Invoked for a received `NewBlock` broadcast message from the peer.

View File

@ -114,47 +114,49 @@
/// Common helpers for network testing.
pub mod test_utils;
mod budget;
mod builder;
pub mod cache;
pub mod config;
mod discovery;
pub mod error;
pub mod eth_requests;
mod fetch;
mod flattened_response;
pub mod import;
mod listener;
mod manager;
pub mod message;
mod metrics;
mod network;
pub mod peers;
pub mod protocol;
pub mod transactions;
mod budget;
mod builder;
mod discovery;
mod fetch;
mod flattened_response;
mod listener;
mod manager;
mod metrics;
mod network;
mod session;
mod state;
mod swarm;
pub mod transactions;
pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols};
pub use reth_network_api::{NetworkInfo, Peers, PeersHandleProvider, PeersInfo};
pub use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState};
pub use reth_network_types::{PeersConfig, SessionsConfig};
pub use session::{
ActiveSessionHandle, ActiveSessionMessage, Direction, EthRlpxConnection, PeerInfo,
PendingSessionEvent, PendingSessionHandle, PendingSessionHandshakeError, SessionCommand,
SessionEvent, SessionId, SessionManager,
};
pub use builder::NetworkBuilder;
pub use config::{NetworkConfig, NetworkConfigBuilder};
pub use discovery::{Discovery, DiscoveryEvent};
pub use fetch::FetchClient;
pub use manager::{NetworkEvent, NetworkManager};
pub use message::PeerRequest;
pub use network::{NetworkEvents, NetworkHandle, NetworkProtocols};
pub use session::{
ActiveSessionHandle, ActiveSessionMessage, Direction, PeerInfo, PendingSessionEvent,
PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, SessionEvent, SessionId,
SessionManager,
};
pub use flattened_response::FlattenedResponse;
pub use manager::DiscoveredEvent;
pub use manager::{DiscoveredEvent, NetworkEvent, NetworkManager};
pub use message::PeerRequest;
pub use metrics::TxTypesCounter;
pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols};
pub use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState};
pub use reth_network_types::{PeersConfig, SessionsConfig};
pub use session::EthRlpxConnection;
pub use network::{
BlockDownloaderProvider, FullNetwork, NetworkEvents, NetworkHandle, NetworkProtocols,
};
pub use swarm::NetworkConnectionState;
pub use transactions::{FilterAnnouncement, MessageFilter, ValidateTx68};

View File

@ -1,12 +1,13 @@
//! Contains connection-oriented interfaces.
use futures::{ready, Stream};
use std::{
io,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use futures::{ready, Stream};
use tokio::net::{TcpListener, TcpStream};
/// A tcp connection listener.

View File

@ -37,7 +37,7 @@ use reth_fs_util::{self as fs, FsPathError};
use reth_metrics::common::mpsc::UnboundedMeteredSender;
use reth_network_api::{EthProtocolInfo, NetworkStatus, PeerInfo};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::ReputationChangeKind;
use reth_network_types::{PeerAddr, PeersHandle, ReputationChangeKind};
use reth_primitives::ForkId;
use reth_storage_api::BlockNumReader;
use reth_tasks::shutdown::GracefulShutdown;
@ -58,7 +58,7 @@ use crate::{
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender},
metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
network::{NetworkHandle, NetworkHandleMessage},
peers::{PeerAddr, PeersHandle, PeersManager},
peers::PeersManager,
poll_nested_stream_with_budget,
protocol::IntoRlpxSubProtocol,
session::SessionManager,

View File

@ -3,6 +3,12 @@
//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
use std::{
fmt,
sync::Arc,
task::{ready, Context, Poll},
};
use futures::FutureExt;
use reth_eth_wire::{
capability::RawCapabilityMessage, message::RequestPair, BlockBodies, BlockHeaders, EthMessage,
@ -15,11 +21,6 @@ use reth_network_peers::PeerId;
use reth_primitives::{
BlockBody, Bytes, Header, PooledTransactionsElement, ReceiptWithBloom, B256,
};
use std::{
fmt,
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot};
/// Internal form of a `NewBlock` message

View File

@ -1,27 +1,3 @@
use crate::{
config::NetworkMode,
discovery::DiscoveryEvent,
manager::NetworkEvent,
message::PeerRequest,
peers::{PeerAddr, PeersHandle},
protocol::RlpxSubProtocol,
swarm::NetworkConnectionState,
transactions::TransactionsHandle,
FetchClient,
};
use enr::Enr;
use parking_lot::Mutex;
use reth_discv4::Discv4;
use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions};
use reth_network_api::{
NetworkError, NetworkInfo, NetworkStatus, PeerInfo, PeerKind, Peers, PeersInfo,
};
use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::{Reputation, ReputationChangeKind};
use reth_primitives::{Head, TransactionSigned, B256};
use reth_tokio_util::{EventSender, EventStream};
use secp256k1::SecretKey;
use std::{
net::SocketAddr,
sync::{
@ -29,12 +5,58 @@ use std::{
Arc,
},
};
use enr::Enr;
use futures::Future;
use parking_lot::Mutex;
use reth_discv4::Discv4;
use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions};
use reth_network_api::{
NetworkError, NetworkInfo, NetworkStatus, PeerInfo, Peers, PeersHandleProvider, PeersInfo,
};
use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::{PeerAddr, PeerKind, PeersHandle, Reputation, ReputationChangeKind};
use reth_primitives::{Head, TransactionSigned, B256};
use reth_tokio_util::{EventSender, EventStream};
use secp256k1::SecretKey;
use tokio::sync::{
mpsc::{self, UnboundedSender},
oneshot,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::{
config::NetworkMode, discovery::DiscoveryEvent, manager::NetworkEvent, message::PeerRequest,
protocol::RlpxSubProtocol, swarm::NetworkConnectionState, transactions::TransactionsHandle,
FetchClient,
};
/// Helper trait that unifies network API needed to launch node.
pub trait FullNetwork:
BlockDownloaderProvider
+ NetworkSyncUpdater
+ NetworkInfo
+ NetworkEvents
+ PeersInfo
+ Peers
+ Clone
+ 'static
{
}
impl<T> FullNetwork for T where
T: BlockDownloaderProvider
+ NetworkSyncUpdater
+ NetworkInfo
+ NetworkEvents
+ PeersInfo
+ Peers
+ Clone
+ 'static
{
}
/// A _shareable_ network frontend. Used to interact with the network.
///
/// See also [`NetworkManager`](crate::NetworkManager).
@ -85,26 +107,10 @@ impl NetworkHandle {
&self.inner.local_peer_id
}
/// Returns the [`PeersHandle`] that can be cloned and shared.
///
/// The [`PeersHandle`] can be used to interact with the network's peer set.
pub fn peers_handle(&self) -> &PeersHandle {
&self.inner.peers
}
fn manager(&self) -> &UnboundedSender<NetworkHandleMessage> {
&self.inner.to_manager_tx
}
/// Returns a new [`FetchClient`] that can be cloned and shared.
///
/// The [`FetchClient`] is the entrypoint for sending requests to the network.
pub async fn fetch_client(&self) -> Result<FetchClient, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::FetchClient(tx));
rx.await
}
/// Returns the mode of the network, either pow, or pos
pub fn mode(&self) -> &NetworkMode {
&self.inner.network_mode
@ -328,6 +334,12 @@ impl Peers for NetworkHandle {
}
}
impl PeersHandleProvider for NetworkHandle {
fn peers_handle(&self) -> &PeersHandle {
&self.inner.peers
}
}
impl NetworkInfo for NetworkHandle {
fn local_addr(&self) -> SocketAddr {
*self.inner.listener_address.lock()
@ -381,6 +393,25 @@ impl NetworkSyncUpdater for NetworkHandle {
}
}
/// Provides [`FetchClient`] for downloading blocks.
#[auto_impl::auto_impl(&, Arc)]
pub trait BlockDownloaderProvider {
/// Returns a new [`FetchClient`] that can be cloned and shared.
///
/// The [`FetchClient`] is the entrypoint for sending requests to the network.
fn fetch_client(
&self,
) -> impl Future<Output = Result<FetchClient, oneshot::error::RecvError>> + Send;
}
impl BlockDownloaderProvider for NetworkHandle {
async fn fetch_client(&self) -> Result<FetchClient, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::FetchClient(tx));
rx.await
}
}
#[derive(Debug)]
struct NetworkInner {
/// Number of active peer sessions the node's currently handling.
@ -412,6 +443,7 @@ struct NetworkInner {
}
/// Provides event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkEvents: Send + Sync {
/// Creates a new [`NetworkEvent`] listener channel.
fn event_listener(&self) -> EventStream<NetworkEvent>;

View File

@ -1,25 +1,5 @@
//! Peer related implementations
use crate::{
error::SessionError,
session::{Direction, PendingSessionHandshakeError},
swarm::NetworkConnectionState,
};
use futures::StreamExt;
use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
use reth_net_banlist::BanList;
use reth_network_api::PeerKind;
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::{
peers::{
config::PeerBackoffDurations,
reputation::{
is_banned_reputation, DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE,
},
},
ConnectionsConfig, PeersConfig, ReputationChangeKind, ReputationChangeWeights,
};
use reth_primitives::ForkId;
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
fmt::Display,
@ -28,59 +8,33 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use futures::StreamExt;
use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
use reth_net_banlist::BanList;
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::{
peers::{
config::PeerBackoffDurations,
reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
},
ConnectionsConfig, Peer, PeerAddr, PeerCommand, PeerConnectionState, PeerKind, PeersConfig,
PeersHandle, ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
};
use reth_primitives::ForkId;
use thiserror::Error;
use tokio::{
sync::{mpsc, oneshot},
sync::mpsc,
time::{Instant, Interval},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{trace, warn};
/// A communication channel to the [`PeersManager`] to apply manual changes to the peer set.
#[derive(Clone, Debug)]
pub struct PeersHandle {
/// Sender half of command channel back to the [`PeersManager`]
manager_tx: mpsc::UnboundedSender<PeerCommand>,
}
// === impl PeersHandle ===
impl PeersHandle {
fn send(&self, cmd: PeerCommand) {
let _ = self.manager_tx.send(cmd);
}
/// Adds a peer to the set.
pub fn add_peer(&self, peer_id: PeerId, addr: SocketAddr) {
self.send(PeerCommand::Add(peer_id, addr));
}
/// Removes a peer from the set.
pub fn remove_peer(&self, peer_id: PeerId) {
self.send(PeerCommand::Remove(peer_id));
}
/// Send a reputation change for the given peer.
pub fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) {
self.send(PeerCommand::ReputationChange(peer_id, kind));
}
/// Returns a peer by its [`PeerId`], or `None` if the peer is not in the peer set.
pub async fn peer_by_id(&self, peer_id: PeerId) -> Option<Peer> {
let (tx, rx) = oneshot::channel();
self.send(PeerCommand::GetPeer(peer_id, tx));
rx.await.unwrap_or(None)
}
/// Returns all peers in the peerset.
pub async fn all_peers(&self) -> Vec<NodeRecord> {
let (tx, rx) = oneshot::channel();
self.send(PeerCommand::GetPeers(tx));
rx.await.unwrap_or_default()
}
}
use crate::{
error::SessionError,
session::{Direction, PendingSessionHandshakeError},
swarm::NetworkConnectionState,
};
/// Maintains the state of _all_ the peers known to the network.
///
@ -198,7 +152,7 @@ impl PeersManager {
/// Returns a new [`PeersHandle`] that can send commands to this type.
pub(crate) fn handle(&self) -> PeersHandle {
PeersHandle { manager_tx: self.manager_tx.clone() }
PeersHandle::new(self.manager_tx.clone())
}
/// Returns the number of peers in the peer set
@ -211,9 +165,9 @@ impl PeersManager {
pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
self.peers.iter().map(|(peer_id, v)| {
NodeRecord::new_with_ports(
v.addr.tcp.ip(),
v.addr.tcp.port(),
v.addr.udp.map(|addr| addr.port()),
v.addr.tcp().ip(),
v.addr.tcp().port(),
v.addr.udp().map(|addr| addr.port()),
*peer_id,
)
})
@ -224,9 +178,9 @@ impl PeersManager {
self.peers.get(&peer_id).map(|v| {
(
NodeRecord::new_with_ports(
v.addr.tcp.ip(),
v.addr.tcp.port(),
v.addr.udp.map(|addr| addr.port()),
v.addr.tcp().ip(),
v.addr.tcp().port(),
v.addr.udp().map(|addr| addr.port()),
peer_id,
),
v.kind,
@ -362,7 +316,7 @@ impl PeersManager {
Entry::Vacant(entry) => {
// peer is missing in the table, we add it but mark it as to be removed after
// disconnect, because we only know the outgoing port
let mut peer = Peer::with_state(PeerAddr::tcp(addr), PeerConnectionState::In);
let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
peer.remove_after_disconnect = true;
entry.insert(peer);
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
@ -715,7 +669,7 @@ impl PeersManager {
addr: PeerAddr,
fork_id: Option<ForkId>,
) {
if self.ban_list.is_banned(&peer_id, &addr.tcp.ip()) {
if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
return
}
@ -734,7 +688,7 @@ impl PeersManager {
}
}
Entry::Vacant(entry) => {
trace!(target: "net::peers", ?peer_id, ?addr.tcp, "discovered new node");
trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
let mut peer = Peer::with_kind(addr, kind);
peer.fork_id = fork_id;
entry.insert(peer);
@ -849,7 +803,7 @@ impl PeersManager {
trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
peer.state = PeerConnectionState::PendingOut;
PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp }
PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
};
self.connection_info.inc_pending_out();
@ -887,7 +841,7 @@ impl PeersManager {
while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
match cmd {
PeerCommand::Add(peer_id, addr) => {
self.add_peer(peer_id, PeerAddr::tcp(addr), None);
self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
}
PeerCommand::Remove(peer) => self.remove_peer(peer),
PeerCommand::ReputationChange(peer_id, rep) => {
@ -1019,240 +973,6 @@ impl ConnectionInfo {
}
}
/// Represents a peer's address information.
///
/// # Fields
///
/// - `tcp`: A `SocketAddr` representing the peer's data transfer address.
/// - `udp`: An optional `SocketAddr` representing the peer's discover address. `None` if the peer
/// is directly connecting to us or the port is the same to `tcp`'s
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct PeerAddr {
tcp: SocketAddr,
udp: Option<SocketAddr>,
}
impl PeerAddr {
/// Returns a new `PeerAddr` with the given `tcp` and `udp` addresses.
pub const fn new(tcp: SocketAddr, udp: Option<SocketAddr>) -> Self {
Self { tcp, udp }
}
/// Returns a new `PeerAddr` with a `tcp` address only.
pub const fn tcp(tcp: SocketAddr) -> Self {
Self { tcp, udp: None }
}
/// Returns a new `PeerAddr` with the given `tcp` and `udp` ports.
fn new_with_ports(ip: IpAddr, tcp_port: u16, udp_port: Option<u16>) -> Self {
let tcp = SocketAddr::new(ip, tcp_port);
let udp = udp_port.map(|port| SocketAddr::new(ip, port));
Self::new(tcp, udp)
}
}
/// Tracks info about a single peer.
#[derive(Debug, Clone)]
pub struct Peer {
/// Where to reach the peer.
addr: PeerAddr,
/// Reputation of the peer.
reputation: i32,
/// The state of the connection, if any.
state: PeerConnectionState,
/// The [`ForkId`] that the peer announced via discovery.
fork_id: Option<ForkId>,
/// Whether the entry should be removed after an existing session was terminated.
remove_after_disconnect: bool,
/// The kind of peer
kind: PeerKind,
/// Whether the peer is currently backed off.
backed_off: bool,
/// Counts number of times the peer was backed off due to a severe
/// [`reth_network_types::BackoffKind`].
severe_backoff_counter: u8,
}
// === impl Peer ===
impl Peer {
fn new(addr: PeerAddr) -> Self {
Self::with_state(addr, Default::default())
}
fn trusted(addr: PeerAddr) -> Self {
Self { kind: PeerKind::Trusted, ..Self::new(addr) }
}
/// Returns the reputation of the peer
pub const fn reputation(&self) -> i32 {
self.reputation
}
fn with_state(addr: PeerAddr, state: PeerConnectionState) -> Self {
Self {
addr,
state,
reputation: DEFAULT_REPUTATION,
fork_id: None,
remove_after_disconnect: false,
kind: Default::default(),
backed_off: false,
severe_backoff_counter: 0,
}
}
fn with_kind(addr: PeerAddr, kind: PeerKind) -> Self {
Self { kind, ..Self::new(addr) }
}
/// Resets the reputation of the peer to the default value. This always returns
/// [`ReputationChangeOutcome::None`].
fn reset_reputation(&mut self) -> ReputationChangeOutcome {
self.reputation = DEFAULT_REPUTATION;
ReputationChangeOutcome::None
}
/// Applies a reputation change to the peer and returns what action should be taken.
fn apply_reputation(&mut self, reputation: i32) -> ReputationChangeOutcome {
let previous = self.reputation;
// we add reputation since negative reputation change decrease total reputation
self.reputation = previous.saturating_add(reputation);
trace!(target: "net::peers", reputation=%self.reputation, banned=%self.is_banned(), "applied reputation change");
if self.state.is_connected() && self.is_banned() {
self.state.disconnect();
return ReputationChangeOutcome::DisconnectAndBan
}
if self.is_banned() && !is_banned_reputation(previous) {
return ReputationChangeOutcome::Ban
}
if !self.is_banned() && is_banned_reputation(previous) {
return ReputationChangeOutcome::Unban
}
ReputationChangeOutcome::None
}
/// Returns true if the peer's reputation is below the banned threshold.
#[inline]
const fn is_banned(&self) -> bool {
is_banned_reputation(self.reputation)
}
#[inline]
const fn is_backed_off(&self) -> bool {
self.backed_off
}
/// Unbans the peer by resetting its reputation
#[inline]
fn unban(&mut self) {
self.reputation = DEFAULT_REPUTATION
}
/// Returns whether this peer is trusted
#[inline]
const fn is_trusted(&self) -> bool {
matches!(self.kind, PeerKind::Trusted)
}
/// Returns whether this peer is static
#[inline]
const fn is_static(&self) -> bool {
matches!(self.kind, PeerKind::Static)
}
}
/// Outcomes when a reputation change is applied to a peer
enum ReputationChangeOutcome {
/// Nothing to do.
None,
/// Ban the peer.
Ban,
/// Ban and disconnect
DisconnectAndBan,
/// Unban the peer
Unban,
}
/// Represents the kind of connection established to the peer, if any
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
enum PeerConnectionState {
/// Not connected currently.
#[default]
Idle,
/// Disconnect of an incoming connection in progress
DisconnectingIn,
/// Disconnect of an outgoing connection in progress
DisconnectingOut,
/// Connected via incoming connection.
In,
/// Connected via outgoing connection.
Out,
/// Pending outgoing connection.
PendingOut,
}
// === impl PeerConnectionState ===
impl PeerConnectionState {
/// Sets the disconnect state
#[inline]
fn disconnect(&mut self) {
match self {
Self::In => *self = Self::DisconnectingIn,
Self::Out => *self = Self::DisconnectingOut,
_ => {}
}
}
/// Returns true if this is an active incoming connection.
#[inline]
const fn is_incoming(&self) -> bool {
matches!(self, Self::In)
}
/// Returns whether we're currently connected with this peer
#[inline]
const fn is_connected(&self) -> bool {
matches!(self, Self::In | Self::Out | Self::PendingOut)
}
/// Returns if there's currently no connection to that peer.
#[inline]
const fn is_unconnected(&self) -> bool {
matches!(self, Self::Idle)
}
/// Returns true if there's currently an outbound dial to that peer.
#[inline]
const fn is_pending_out(&self) -> bool {
matches!(self, Self::PendingOut)
}
}
/// Commands the [`PeersManager`] listens for.
#[derive(Debug)]
pub(crate) enum PeerCommand {
/// Command for manually add
Add(PeerId, SocketAddr),
/// Remove a peer from the set
///
/// If currently connected this will disconnect the session
Remove(PeerId),
/// Apply a reputation change to the given peer.
ReputationChange(PeerId, ReputationChangeKind),
/// Get information about a peer
GetPeer(PeerId, oneshot::Sender<Option<Peer>>),
/// Get node information on all peers
GetPeers(oneshot::Sender<Vec<NodeRecord>>),
}
/// Actions the peer manager can trigger.
#[derive(Debug)]
pub enum PeerAction {
@ -1382,7 +1102,7 @@ mod tests {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1436,7 +1156,7 @@ mod tests {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.ban_peer(peer);
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
@ -1458,7 +1178,7 @@ mod tests {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.ban_peer(peer);
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
@ -1495,7 +1215,7 @@ mod tests {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::new(PeersConfig::test());
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1554,7 +1274,7 @@ mod tests {
let backoff_durations = PeerBackoffDurations::test();
let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
let mut peers = PeersManager::new(config);
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1611,7 +1331,7 @@ mod tests {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::test();
let mut peers = PeersManager::new(config);
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
let peer_struct = peers.peers.get_mut(&peer).unwrap();
let backoff_timestamp = peers
@ -1628,7 +1348,7 @@ mod tests {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::default();
let mut peers = PeersManager::new(config);
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
let peer_struct = peers.peers.get_mut(&peer).unwrap();
// Simulate a peer that was already backed off once
@ -1656,7 +1376,7 @@ mod tests {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1713,7 +1433,7 @@ mod tests {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::test();
let mut peers = PeersManager::new(config.clone());
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
let peer_struct = peers.peers.get_mut(&peer).unwrap();
// Simulate a peer that was already backed off once
@ -1767,7 +1487,7 @@ mod tests {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1879,7 +1599,7 @@ mod tests {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1997,7 +1717,7 @@ mod tests {
// to increase by 1
peers.on_incoming_session_established(peer, socket_addr);
let p = peers.peers.get_mut(&peer).expect("peer not found");
assert_eq!(p.addr.tcp, socket_addr);
assert_eq!(p.addr.tcp(), socket_addr);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_inbound, 1);
@ -2012,7 +1732,7 @@ mod tests {
peers.on_already_connected(Direction::Incoming);
let p = peers.peers.get_mut(&peer).expect("peer not found");
assert_eq!(p.addr.tcp, socket_addr);
assert_eq!(p.addr.tcp(), socket_addr);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_inbound, 1);
}
@ -2022,7 +1742,7 @@ mod tests {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_trusted_peer(peer, PeerAddr::tcp(socket_addr));
peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -2074,7 +1794,7 @@ mod tests {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
assert_eq!(peers.get_reputation(&peer), Some(0));
peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
@ -2089,7 +1809,7 @@ mod tests {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -2126,7 +1846,7 @@ mod tests {
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut);
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut);
@ -2139,7 +1859,7 @@ mod tests {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -2174,7 +1894,7 @@ mod tests {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::tcp(socket_addr), None);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -2208,7 +1928,7 @@ mod tests {
let ban_list = BanList::new(HashSet::new(), vec![ip]);
let config = PeersConfig::default().with_ban_list(ban_list);
let mut peer_manager = PeersManager::new(config);
peer_manager.add_peer(B512::default(), PeerAddr::tcp(socket_addr), None);
peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
assert!(peer_manager.peers.is_empty());
}
@ -2311,7 +2031,7 @@ mod tests {
let basic_peer = PeerId::random();
let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
peers.add_peer(basic_peer, PeerAddr::tcp(basic_sock), None);
peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -2351,7 +2071,7 @@ mod tests {
let basic_peer = PeerId::random();
let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
peers.add_peer(basic_peer, PeerAddr::tcp(basic_sock), None);
peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -2459,7 +2179,7 @@ mod tests {
let config = PeersConfig::test();
let mut peer_manager = PeersManager::new(config);
let peer_id = PeerId::random();
peer_manager.add_peer(peer_id, PeerAddr::tcp(socket_addr), None);
peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
tokio::time::sleep(Duration::from_secs(1)).await;
peer_manager.tick();
@ -2514,7 +2234,7 @@ mod tests {
assert!(peer.remove_after_disconnect);
// trigger discovery manually while the peer is still connected
peers.add_peer(peer_id, PeerAddr::tcp(addr), None);
peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
peers.on_active_session_gracefully_closed(peer_id);
@ -2530,7 +2250,7 @@ mod tests {
let mut peers = PeersManager::default();
peers.on_incoming_pending_session(addr.ip()).unwrap();
peers.add_peer(peer_id, PeerAddr::tcp(addr), None);
peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
match event!(peers) {
PeerAction::PeerAdded(_) => {}
@ -2558,7 +2278,7 @@ mod tests {
let mut peers = PeersManager::default();
peers.on_incoming_pending_session(addr.ip()).unwrap();
peers.add_peer(peer_id, PeerAddr::tcp(addr), None);
peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
match event!(peers) {
PeerAction::PeerAdded(_) => {}
@ -2589,7 +2309,7 @@ mod tests {
let config = PeersConfig::default();
let mut peer_manager = PeersManager::new(config);
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let peer_addr = PeerAddr::tcp(SocketAddr::new(ip, 8008));
let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
peer_manager.add_peer(PeerId::random(), peer_addr, None);
}
@ -2608,7 +2328,7 @@ mod tests {
let config = PeersConfig::default();
let mut peer_manager = PeersManager::new(config);
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let peer_addr = PeerAddr::tcp(SocketAddr::new(ip, 8008));
let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
// add more peers than allowed
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {

View File

@ -2,18 +2,19 @@
//!
//! See also <https://github.com/ethereum/devp2p/blob/master/README.md>
use std::{
fmt,
net::SocketAddr,
ops::{Deref, DerefMut},
pin::Pin,
};
use futures::Stream;
use reth_eth_wire::{
capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
};
use reth_network_api::{Direction, PeerId};
use reth_primitives::BytesMut;
use std::{
fmt,
net::SocketAddr,
ops::{Deref, DerefMut},
pin::Pin,
};
/// A trait that allows to offer additional RLPx-based application-level protocols when establishing
/// a peer-to-peer connection.

View File

@ -1,14 +1,16 @@
//! Represents an established session.
use crate::{
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerResponse, PeerResponseResult},
session::{
conn::EthRlpxConnection,
handle::{ActiveSessionMessage, SessionCommand},
SessionId,
},
};
use core::sync::atomic::Ordering;
use std::{
collections::VecDeque,
future::Future,
net::SocketAddr,
pin::Pin,
sync::{atomic::AtomicU64, Arc},
task::{ready, Context, Poll},
time::{Duration, Instant},
};
use futures::{stream::Fuse, SinkExt, StreamExt};
use reth_eth_wire::{
capability::Capabilities,
@ -21,15 +23,6 @@ use reth_network_p2p::error::RequestError;
use reth_network_peers::PeerId;
use reth_network_types::session::config::INITIAL_REQUEST_TIMEOUT;
use rustc_hash::FxHashMap;
use std::{
collections::VecDeque,
future::Future,
net::SocketAddr,
pin::Pin,
sync::{atomic::AtomicU64, Arc},
task::{ready, Context, Poll},
time::{Duration, Instant},
};
use tokio::{
sync::{mpsc::error::TrySendError, oneshot},
time::Interval,
@ -38,6 +31,15 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::PollSender;
use tracing::{debug, trace};
use crate::{
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerResponse, PeerResponseResult},
session::{
conn::EthRlpxConnection,
handle::{ActiveSessionMessage, SessionCommand},
SessionId,
},
};
// Constants for timeout updating.
/// Minimum timeout value

View File

@ -1,5 +1,10 @@
//! Connection types for a session
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::{Sink, Stream};
use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{
@ -8,10 +13,6 @@ use reth_eth_wire::{
multiplex::{ProtocolProxy, RlpxSatelliteStream},
EthMessage, EthStream, EthVersion, P2PStream,
};
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::net::TcpStream;
/// The type of the underlying peer network connection.

View File

@ -1,24 +1,27 @@
//! Session handles.
use crate::{
message::PeerMessage,
session::{conn::EthRlpxConnection, Direction, SessionId},
PendingSessionHandshakeError,
};
use std::{io, net::SocketAddr, sync::Arc, time::Instant};
use reth_ecies::ECIESError;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
errors::EthStreamError,
DisconnectReason, EthVersion, Status,
};
use reth_network_api::{PeerInfo, PeerKind};
use reth_network_api::PeerInfo;
use reth_network_peers::{NodeRecord, PeerId};
use std::{io, net::SocketAddr, sync::Arc, time::Instant};
use reth_network_types::PeerKind;
use tokio::sync::{
mpsc::{self, error::SendError},
oneshot,
};
use crate::{
message::PeerMessage,
session::{conn::EthRlpxConnection, Direction, SessionId},
PendingSessionHandshakeError,
};
/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello
/// message which exchanges the `capabilities` of the peer.
///

View File

@ -1,12 +1,36 @@
//! Support for handling peer sessions.
use crate::{message::PeerMessage, metrics::SessionManagerMetrics, session::active::ActiveSession};
mod active;
mod conn;
mod counter;
mod handle;
pub use conn::EthRlpxConnection;
pub use handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
SessionCommand,
};
pub use crate::message::PeerRequestSender;
pub use reth_network_api::{Direction, PeerInfo};
use std::{
collections::HashMap,
future::Future,
net::SocketAddr,
sync::{atomic::AtomicU64, Arc},
task::{Context, Poll},
time::{Duration, Instant},
};
use counter::SessionCounter;
use futures::{future::Either, io, FutureExt, StreamExt};
use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
errors::EthStreamError,
multiplex::RlpxProtocolMultiplexer,
DisconnectReason, EthVersion, HelloMessageWithProtocols, Status, UnauthedEthStream,
UnauthedP2PStream,
};
@ -17,14 +41,6 @@ use reth_primitives::{ForkFilter, ForkId, ForkTransition, Head};
use reth_tasks::TaskSpawner;
use rustc_hash::FxHashMap;
use secp256k1::SecretKey;
use std::{
collections::HashMap,
future::Future,
net::SocketAddr,
sync::{atomic::AtomicU64, Arc},
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
@ -34,19 +50,13 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::PollSender;
use tracing::{debug, instrument, trace};
mod active;
mod conn;
mod counter;
mod handle;
pub use crate::message::PeerRequestSender;
use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocolHandlers, RlpxSubProtocols};
pub use conn::EthRlpxConnection;
pub use handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
SessionCommand,
use crate::{
message::PeerMessage,
metrics::SessionManagerMetrics,
protocol::{IntoRlpxSubProtocol, RlpxSubProtocolHandlers, RlpxSubProtocols},
session::active::ActiveSession,
};
use reth_eth_wire::multiplex::RlpxProtocolMultiplexer;
pub use reth_network_api::{Direction, PeerInfo};
/// Internal identifier for active sessions.
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
pub struct SessionId(usize);

View File

@ -1,25 +1,5 @@
//! Keeps track of the state of the network.
use crate::{
cache::LruCache,
discovery::{Discovery, DiscoveryEvent},
fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
manager::DiscoveredEvent,
message::{
BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse,
PeerResponseResult,
},
peers::{PeerAction, PeerAddr, PeersManager},
FetchClient,
};
use rand::seq::SliceRandom;
use reth_eth_wire::{
capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status,
};
use reth_network_api::PeerKind;
use reth_network_peers::PeerId;
use reth_primitives::{ForkId, B256};
use std::{
collections::{HashMap, VecDeque},
fmt,
@ -31,9 +11,30 @@ use std::{
},
task::{Context, Poll},
};
use rand::seq::SliceRandom;
use reth_eth_wire::{
capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status,
};
use reth_network_peers::PeerId;
use reth_network_types::{PeerAddr, PeerKind};
use reth_primitives::{ForkId, B256};
use tokio::sync::oneshot;
use tracing::{debug, trace};
use crate::{
cache::LruCache,
discovery::{Discovery, DiscoveryEvent},
fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
manager::DiscoveredEvent,
message::{
BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse,
PeerResponseResult,
},
peers::{PeerAction, PeersManager},
FetchClient,
};
/// Cache limit of blocks to keep track of for a single peer.
const PEER_BLOCK_CACHE_LIMIT: u32 = 512;

View File

@ -1,18 +1,3 @@
use crate::{
listener::{ConnectionListener, ListenerEvent},
message::{PeerMessage, PeerRequestSender},
peers::InboundConnectionError,
protocol::IntoRlpxSubProtocol,
session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
state::{NetworkState, StateAction},
};
use futures::Stream;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
errors::EthStreamError,
EthVersion, Status,
};
use reth_network_peers::PeerId;
use std::{
io,
net::SocketAddr,
@ -21,8 +6,24 @@ use std::{
task::{Context, Poll},
};
use futures::Stream;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
errors::EthStreamError,
EthVersion, Status,
};
use reth_network_peers::PeerId;
use tracing::trace;
use crate::{
listener::{ConnectionListener, ListenerEvent},
message::{PeerMessage, PeerRequestSender},
peers::InboundConnectionError,
protocol::IntoRlpxSubProtocol,
session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
state::{NetworkState, StateAction},
};
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Contains the connectivity related state of the network.
///

View File

@ -1,6 +1,7 @@
use std::{net::SocketAddr, time::Duration};
use enr::{k256::ecdsa::SigningKey, Enr, EnrPublicKey};
use reth_network_peers::PeerId;
use std::{net::SocketAddr, time::Duration};
/// The timeout for tests that create a `GethInstance`
pub const GETH_TIMEOUT: Duration = Duration::from_secs(60);

View File

@ -1,21 +1,20 @@
//! A network implementation for testing purposes.
use crate::{
builder::ETH_REQUEST_CHANNEL_CAPACITY,
error::NetworkError,
eth_requests::EthRequestHandler,
peers::PeersHandle,
protocol::IntoRlpxSubProtocol,
transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig},
NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkHandle,
NetworkManager,
use std::{
fmt,
future::Future,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
pin::Pin,
task::{Context, Poll},
};
use futures::{FutureExt, StreamExt};
use pin_project::pin_project;
use reth_chainspec::MAINNET;
use reth_eth_wire::{protocol::Protocol, DisconnectReason, HelloMessageWithProtocols};
use reth_network_api::{NetworkInfo, Peers};
use reth_network_api::{NetworkInfo, Peers, PeersHandleProvider};
use reth_network_peers::PeerId;
use reth_network_types::PeersHandle;
use reth_provider::test_utils::NoopProvider;
use reth_storage_api::{BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory};
use reth_tasks::TokioTaskExecutor;
@ -26,13 +25,6 @@ use reth_transaction_pool::{
EthTransactionPool, TransactionPool, TransactionValidationTaskExecutor,
};
use secp256k1::SecretKey;
use std::{
fmt,
future::Future,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
pin::Pin,
task::{Context, Poll},
};
use tokio::{
sync::{
mpsc::{channel, unbounded_channel},
@ -41,6 +33,16 @@ use tokio::{
task::JoinHandle,
};
use crate::{
builder::ETH_REQUEST_CHANNEL_CAPACITY,
error::NetworkError,
eth_requests::EthRequestHandler,
protocol::IntoRlpxSubProtocol,
transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig},
NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkHandle,
NetworkManager,
};
/// A test network consisting of multiple peers.
pub struct Testnet<C, Pool> {
/// All running peers in the network.

View File

@ -1,8 +1,9 @@
use derive_more::Constructor;
use super::{
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
};
use derive_more::Constructor;
/// Configuration for managing transactions within the network.
#[derive(Debug, Default, Clone)]

View File

@ -25,16 +25,15 @@
//! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large
//! enough to buffer many hashes during network failure, to allow for recovery.
use crate::{
cache::{LruCache, LruMap},
duration_metered_exec,
message::PeerRequest,
metrics::TransactionFetcherMetrics,
transactions::{validation, PartiallyFilterMessage},
use std::{
collections::HashMap,
pin::Pin,
task::{ready, Context, Poll},
time::Duration,
};
use derive_more::{Constructor, Deref};
use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
use pin_project::pin_project;
use reth_eth_wire::{
DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
@ -46,12 +45,6 @@ use reth_primitives::{PooledTransactionsElement, TxHash};
use schnellru::ByLength;
#[cfg(debug_assertions)]
use smallvec::{smallvec, SmallVec};
use std::{
collections::HashMap,
pin::Pin,
task::{ready, Context, Poll},
time::Duration,
};
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tracing::{debug, trace};
use validation::FilterOutcome;
@ -62,6 +55,13 @@ use super::{
MessageFilter, PeerMetadata, PooledTransactions,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
};
use crate::{
cache::{LruCache, LruMap},
duration_metered_exec,
message::PeerRequest,
metrics::TransactionFetcherMetrics,
transactions::{validation, PartiallyFilterMessage},
};
/// The type responsible for fetching missing transactions from peers.
///

View File

@ -1,5 +1,36 @@
//! Transactions management for the p2p network.
/// Aggregation on configurable parameters for [`TransactionsManager`].
pub mod config;
/// Default and spec'd bounds.
pub mod constants;
/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
pub mod fetcher;
pub mod validation;
pub use self::constants::{
tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
};
pub use config::{TransactionFetcherConfig, TransactionsManagerConfig};
pub use validation::*;
pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::{Duration, Instant},
};
use futures::{stream::FuturesUnordered, Future, StreamExt};
use reth_eth_wire::{
EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
@ -21,16 +52,6 @@ use reth_transaction_pool::{
GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions,
TransactionPool, ValidPoolTransaction,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::{debug, trace};
@ -50,25 +71,6 @@ use crate::{
NetworkEvents, NetworkHandle,
};
/// Aggregation on configurable parameters for [`TransactionsManager`].
pub mod config;
/// Default and spec'd bounds.
pub mod constants;
/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
pub mod fetcher;
pub mod validation;
pub use config::{TransactionFetcherConfig, TransactionsManagerConfig};
use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
pub use validation::*;
pub use self::constants::{
tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
};
use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
/// The future for importing transactions into the pool.
///
/// Resolves with the result of each transaction import.

View File

@ -1,5 +1,7 @@
//! Connection tests
use std::{collections::HashSet, net::SocketAddr, time::Duration};
use alloy_node_bindings::Geth;
use alloy_provider::{ext::AdminApi, ProviderBuilder};
use futures::StreamExt;
@ -8,7 +10,8 @@ use reth_eth_wire::{DisconnectReason, HeadersDirection};
use reth_net_banlist::BanList;
use reth_network::{
test_utils::{enr_to_peer_id, NetworkEventStream, PeerConfig, Testnet, GETH_TIMEOUT},
NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkManager, PeersConfig,
BlockDownloaderProvider, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkManager,
PeersConfig,
};
use reth_network_api::{NetworkInfo, Peers, PeersInfo};
use reth_network_p2p::{
@ -19,7 +22,6 @@ use reth_network_peers::{mainnet_nodes, NodeRecord, TrustedPeer};
use reth_provider::test_utils::NoopProvider;
use reth_transaction_pool::test_utils::testing_pool;
use secp256k1::SecretKey;
use std::{collections::HashSet, net::SocketAddr, time::Duration};
use tokio::task;
use url::Host;

View File

@ -1,7 +1,12 @@
#![allow(unreachable_pub)]
//! Testing gossiping of transactions.
use crate::multiplex::proto::{PingPongProtoMessage, PingPongProtoMessageKind};
use std::{
net::SocketAddr,
pin::Pin,
task::{ready, Context, Poll},
};
use futures::{Stream, StreamExt};
use reth_eth_wire::{
capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
@ -13,14 +18,11 @@ use reth_network::{
use reth_network_api::{Direction, PeerId};
use reth_primitives::BytesMut;
use reth_provider::test_utils::MockEthProvider;
use std::{
net::SocketAddr,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::multiplex::proto::{PingPongProtoMessage, PingPongProtoMessageKind};
/// A simple Rlpx subprotocol that sends pings and pongs
mod proto {
use super::*;

View File

@ -1,11 +1,13 @@
#![allow(unreachable_pub)]
//! Tests for eth related requests
use std::sync::Arc;
use rand::Rng;
use reth_eth_wire::HeadersDirection;
use reth_network::{
test_utils::{NetworkEventStream, Testnet},
NetworkEvents,
BlockDownloaderProvider, NetworkEvents,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_network_p2p::{
@ -17,7 +19,6 @@ use reth_primitives::{
U256,
};
use reth_provider::test_utils::MockEthProvider;
use std::sync::Arc;
/// Returns a new [`TransactionSigned`] with some random parameters
pub fn rng_transaction(rng: &mut impl rand::RngCore) -> TransactionSigned {

View File

@ -1,3 +1,8 @@
use std::{
io,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
};
use reth_discv4::Discv4Config;
use reth_network::{
error::{NetworkError, ServiceKind},
@ -6,10 +11,6 @@ use reth_network::{
use reth_network_api::{NetworkInfo, PeersInfo};
use reth_provider::test_utils::NoopProvider;
use secp256k1::SecretKey;
use std::{
io,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
};
use tokio::net::TcpListener;
fn is_addr_in_use_kind(err: &NetworkError, kind: ServiceKind) -> bool {

View File

@ -1,5 +1,7 @@
//! Testing gossiping of transactions.
use std::sync::Arc;
use futures::StreamExt;
use rand::thread_rng;
use reth_network::{test_utils::Testnet, NetworkEvent, NetworkEvents};
@ -7,7 +9,6 @@ use reth_network_api::PeersInfo;
use reth_primitives::{TransactionSigned, TxLegacy, U256};
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction, TransactionPool};
use std::sync::Arc;
#[tokio::test(flavor = "multi_thread")]
async fn test_tx_gossip() {