refactor: extract configuration types to reth-network-types (#9136)

This commit is contained in:
Arsenii Kulikov
2024-06-27 13:33:13 +04:00
committed by GitHub
parent 9542f3bcf0
commit 18eef6a991
22 changed files with 584 additions and 504 deletions

View File

@ -12,7 +12,7 @@ workspace = true
[dependencies]
# reth
reth-network.workspace = true
reth-network-types = { workspace = true, features = ["serde"] }
reth-prune-types.workspace = true
# serde

View File

@ -1,6 +1,6 @@
//! Configuration files.
use reth_network::{PeersConfig, SessionsConfig};
use reth_network_types::{PeersConfig, SessionsConfig};
use reth_prune_types::PruneModes;
use serde::{Deserialize, Deserializer, Serialize};
use std::{

View File

@ -0,0 +1,30 @@
[package]
name = "reth-network-types"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Commonly used network types"
[lints]
workspace = true
[dependencies]
# reth
reth-network-api.workspace = true
reth-network-peers.workspace = true
reth-net-banlist.workspace = true
# io
serde = { workspace = true, optional = true }
humantime-serde = { workspace = true, optional = true }
serde_json = { workspace = true }
# misc
tracing.workspace = true
[features]
serde = ["dep:serde", "dep:humantime-serde"]
test-utils = []

View File

@ -0,0 +1,27 @@
/// Describes the type of backoff should be applied.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BackoffKind {
/// Use the lowest configured backoff duration.
///
/// This applies to connection problems where there is a chance that they will be resolved
/// after the short duration.
Low,
/// Use a slightly higher duration to put a peer in timeout
///
/// This applies to more severe connection problems where there is a lower chance that they
/// will be resolved.
Medium,
/// Use the max configured backoff duration.
///
/// This is intended for spammers, or bad peers in general.
High,
}
// === impl BackoffKind ===
impl BackoffKind {
/// Returns true if the backoff is considered severe.
pub const fn is_severe(&self) -> bool {
matches!(self, Self::Medium | Self::High)
}
}

View File

@ -0,0 +1,24 @@
//! Commonly used networking types.
//!
//! ## Feature Flags
//!
//! - `serde` (default): Enable serde support
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
/// Types related to peering.
pub mod peers;
pub use peers::{ConnectionsConfig, PeersConfig, ReputationChangeWeights};
pub mod session;
pub use session::{SessionLimits, SessionsConfig};
/// [`BackoffKind`] definition.
mod backoff;
pub use backoff::BackoffKind;

View File

@ -0,0 +1,292 @@
//! Configuration for peering.
use crate::{BackoffKind, ReputationChangeWeights};
use reth_net_banlist::BanList;
use reth_network_peers::NodeRecord;
use std::{
collections::HashSet,
io::{self, ErrorKind},
path::Path,
time::Duration,
};
use tracing::info;
/// Maximum number of available slots for outbound sessions.
pub const DEFAULT_MAX_COUNT_PEERS_OUTBOUND: u32 = 100;
/// Maximum number of available slots for inbound sessions.
pub const DEFAULT_MAX_COUNT_PEERS_INBOUND: u32 = 30;
/// Maximum number of available slots for concurrent outgoing dials.
///
/// This restricts how many outbound dials can be performed concurrently.
pub const DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS: usize = 15;
/// The durations to use when a backoff should be applied to a peer.
///
/// See also [`BackoffKind`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PeerBackoffDurations {
/// Applies to connection problems where there is a chance that they will be resolved after the
/// short duration.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub low: Duration,
/// Applies to more severe connection problems where there is a lower chance that they will be
/// resolved.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub medium: Duration,
/// Intended for spammers, or bad peers in general.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub high: Duration,
/// Maximum total backoff duration.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub max: Duration,
}
impl PeerBackoffDurations {
/// Returns the corresponding [`Duration`]
pub const fn backoff(&self, kind: BackoffKind) -> Duration {
match kind {
BackoffKind::Low => self.low,
BackoffKind::Medium => self.medium,
BackoffKind::High => self.high,
}
}
/// Returns the timestamp until which we should backoff.
///
/// The Backoff duration is capped by the configured maximum backoff duration.
pub fn backoff_until(&self, kind: BackoffKind, backoff_counter: u8) -> std::time::Instant {
let backoff_time = self.backoff(kind);
let backoff_time = backoff_time + backoff_time * backoff_counter as u32;
let now = std::time::Instant::now();
now + backoff_time.min(self.max)
}
/// Returns durations for testing.
#[cfg(any(test, feature = "test-utils"))]
pub const fn test() -> Self {
Self {
low: Duration::from_millis(200),
medium: Duration::from_millis(200),
high: Duration::from_millis(200),
max: Duration::from_millis(200),
}
}
}
impl Default for PeerBackoffDurations {
fn default() -> Self {
Self {
low: Duration::from_secs(30),
// 3min
medium: Duration::from_secs(60 * 3),
// 15min
high: Duration::from_secs(60 * 15),
// 1h
max: Duration::from_secs(60 * 60),
}
}
}
/// Tracks stats about connected nodes
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize), serde(default))]
pub struct ConnectionsConfig {
/// Maximum allowed outbound connections.
pub max_outbound: usize,
/// Maximum allowed inbound connections.
pub max_inbound: usize,
/// Maximum allowed concurrent outbound dials.
#[cfg_attr(feature = "serde", serde(default))]
pub max_concurrent_outbound_dials: usize,
}
impl Default for ConnectionsConfig {
fn default() -> Self {
Self {
max_outbound: DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize,
max_inbound: DEFAULT_MAX_COUNT_PEERS_INBOUND as usize,
max_concurrent_outbound_dials: DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS,
}
}
}
/// Config type for initiating a `PeersManager` instance.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(default))]
pub struct PeersConfig {
/// How often to recheck free slots for outbound connections.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub refill_slots_interval: Duration,
/// Trusted nodes to connect to or accept from
pub trusted_nodes: HashSet<NodeRecord>,
/// Connect to or accept from trusted nodes only?
#[cfg_attr(feature = "serde", serde(alias = "connect_trusted_nodes_only"))]
pub trusted_nodes_only: bool,
/// Maximum number of backoff attempts before we give up on a peer and dropping.
///
/// The max time spent of a peer before it's removed from the set is determined by the
/// configured backoff duration and the max backoff count.
///
/// With a backoff counter of 5 and a backoff duration of 1h, the minimum time spent of the
/// peer in the table is the sum of all backoffs (1h + 2h + 3h + 4h + 5h = 15h).
///
/// Note: this does not apply to trusted peers.
pub max_backoff_count: u8,
/// Basic nodes to connect to.
#[cfg_attr(feature = "serde", serde(skip))]
pub basic_nodes: HashSet<NodeRecord>,
/// How long to ban bad peers.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub ban_duration: Duration,
/// Restrictions on `PeerIds` and Ips.
#[cfg_attr(feature = "serde", serde(skip))]
pub ban_list: BanList,
/// Restrictions on connections.
pub connection_info: ConnectionsConfig,
/// How to weigh reputation changes.
pub reputation_weights: ReputationChangeWeights,
/// How long to backoff peers that we are failed to connect to for non-fatal reasons.
///
/// The backoff duration increases with number of backoff attempts.
pub backoff_durations: PeerBackoffDurations,
}
impl Default for PeersConfig {
fn default() -> Self {
Self {
refill_slots_interval: Duration::from_millis(5_000),
connection_info: Default::default(),
reputation_weights: Default::default(),
ban_list: Default::default(),
// Ban peers for 12h
ban_duration: Duration::from_secs(60 * 60 * 12),
backoff_durations: Default::default(),
trusted_nodes: Default::default(),
trusted_nodes_only: false,
basic_nodes: Default::default(),
max_backoff_count: 5,
}
}
}
impl PeersConfig {
/// A set of `peer_ids` and ip addr that we want to never connect to
pub fn with_ban_list(mut self, ban_list: BanList) -> Self {
self.ban_list = ban_list;
self
}
/// Configure how long to ban bad peers
pub const fn with_ban_duration(mut self, ban_duration: Duration) -> Self {
self.ban_duration = ban_duration;
self
}
/// Maximum allowed outbound connections.
pub const fn with_max_outbound(mut self, max_outbound: usize) -> Self {
self.connection_info.max_outbound = max_outbound;
self
}
/// Maximum allowed inbound connections with optional update.
pub const fn with_max_inbound_opt(mut self, max_inbound: Option<usize>) -> Self {
if let Some(max_inbound) = max_inbound {
self.connection_info.max_inbound = max_inbound;
}
self
}
/// Maximum allowed outbound connections with optional update.
pub const fn with_max_outbound_opt(mut self, max_outbound: Option<usize>) -> Self {
if let Some(max_outbound) = max_outbound {
self.connection_info.max_outbound = max_outbound;
}
self
}
/// Maximum allowed inbound connections.
pub const fn with_max_inbound(mut self, max_inbound: usize) -> Self {
self.connection_info.max_inbound = max_inbound;
self
}
/// Maximum allowed concurrent outbound dials.
pub const fn with_max_concurrent_dials(mut self, max_concurrent_outbound_dials: usize) -> Self {
self.connection_info.max_concurrent_outbound_dials = max_concurrent_outbound_dials;
self
}
/// Nodes to always connect to.
pub fn with_trusted_nodes(mut self, nodes: HashSet<NodeRecord>) -> Self {
self.trusted_nodes = nodes;
self
}
/// Connect only to trusted nodes.
pub const fn with_trusted_nodes_only(mut self, trusted_only: bool) -> Self {
self.trusted_nodes_only = trusted_only;
self
}
/// Nodes available at launch.
pub fn with_basic_nodes(mut self, nodes: HashSet<NodeRecord>) -> Self {
self.basic_nodes = nodes;
self
}
/// Configures the max allowed backoff count.
pub const fn with_max_backoff_count(mut self, max_backoff_count: u8) -> Self {
self.max_backoff_count = max_backoff_count;
self
}
/// Configures how to weigh reputation changes.
pub const fn with_reputation_weights(
mut self,
reputation_weights: ReputationChangeWeights,
) -> Self {
self.reputation_weights = reputation_weights;
self
}
/// Configures how long to backoff peers that are we failed to connect to for non-fatal reasons
pub const fn with_backoff_durations(mut self, backoff_durations: PeerBackoffDurations) -> Self {
self.backoff_durations = backoff_durations;
self
}
/// Returns the maximum number of peers, inbound and outbound.
pub const fn max_peers(&self) -> usize {
self.connection_info.max_outbound + self.connection_info.max_inbound
}
/// Read from file nodes available at launch. Ignored if None.
pub fn with_basic_nodes_from_file(
self,
optional_file: Option<impl AsRef<Path>>,
) -> Result<Self, io::Error> {
let Some(file_path) = optional_file else { return Ok(self) };
let reader = match std::fs::File::open(file_path.as_ref()) {
Ok(file) => io::BufReader::new(file),
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(self),
Err(e) => Err(e)?,
};
info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers");
let nodes: HashSet<NodeRecord> = serde_json::from_reader(reader)?;
Ok(self.with_basic_nodes(nodes))
}
/// Returns settings for testing
#[cfg(any(test, feature = "test-utils"))]
pub fn test() -> Self {
Self {
refill_slots_interval: Duration::from_millis(100),
backoff_durations: PeerBackoffDurations::test(),
..Default::default()
}
}
}

View File

@ -0,0 +1,5 @@
pub mod reputation;
pub use reputation::ReputationChangeWeights;
pub mod config;
pub use config::{ConnectionsConfig, PeersConfig};

View File

@ -3,13 +3,13 @@
use reth_network_api::{Reputation, ReputationChangeKind};
/// The default reputation of a peer
pub(crate) const DEFAULT_REPUTATION: Reputation = 0;
pub const DEFAULT_REPUTATION: Reputation = 0;
/// The minimal unit we're measuring reputation
const REPUTATION_UNIT: i32 = -1024;
/// The reputation value below which new connection from/to peers are rejected.
pub(crate) const BANNED_REPUTATION: i32 = 50 * REPUTATION_UNIT;
pub const BANNED_REPUTATION: i32 = 50 * REPUTATION_UNIT;
/// The reputation change to apply to a peer that dropped the connection.
const REMOTE_DISCONNECT_REPUTATION_CHANGE: i32 = 4 * REPUTATION_UNIT;
@ -42,11 +42,11 @@ const BAD_ANNOUNCEMENT_REPUTATION_CHANGE: i32 = REPUTATION_UNIT;
/// This gives a trusted peer more leeway when interacting with the node, which is useful for in
/// custom setups. By not setting this to `0` we still allow trusted peer penalization but less than
/// untrusted peers.
pub(crate) const MAX_TRUSTED_PEER_REPUTATION_CHANGE: Reputation = 2 * REPUTATION_UNIT;
pub const MAX_TRUSTED_PEER_REPUTATION_CHANGE: Reputation = 2 * REPUTATION_UNIT;
/// Returns `true` if the given reputation is below the [`BANNED_REPUTATION`] threshold
#[inline]
pub(crate) const fn is_banned_reputation(reputation: i32) -> bool {
pub const fn is_banned_reputation(reputation: i32) -> bool {
reputation < BANNED_REPUTATION
}
@ -80,7 +80,7 @@ pub struct ReputationChangeWeights {
impl ReputationChangeWeights {
/// Returns the quantifiable [`ReputationChange`] for the given [`ReputationChangeKind`] using
/// the configured weights
pub(crate) fn change(&self, kind: ReputationChangeKind) -> ReputationChange {
pub fn change(&self, kind: ReputationChangeKind) -> ReputationChange {
match kind {
ReputationChangeKind::BadMessage => self.bad_message.into(),
ReputationChangeKind::BadBlock => self.bad_block.into(),
@ -115,14 +115,14 @@ impl Default for ReputationChangeWeights {
/// Represents a change in a peer's reputation.
#[derive(Debug, Copy, Clone, Default)]
pub(crate) struct ReputationChange(Reputation);
pub struct ReputationChange(Reputation);
// === impl ReputationChange ===
impl ReputationChange {
/// Helper type for easier conversion
#[inline]
pub(crate) const fn as_i32(self) -> Reputation {
pub const fn as_i32(self) -> Reputation {
self.0
}
}

View File

@ -1,9 +1,6 @@
//! Configuration types for [`SessionManager`](crate::session::SessionManager).
//! Configuration types for peer sessions manager.
use crate::{
peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND},
session::{Direction, ExceedsSessionLimit},
};
use crate::peers::config::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND};
use std::time::Duration;
/// Default request timeout for a single request.
@ -29,7 +26,7 @@ const DEFAULT_MAX_PEERS: usize =
/// With maxed out peers, this will allow for 3 messages per session (average)
const DEFAULT_SESSION_EVENT_BUFFER_SIZE: usize = DEFAULT_MAX_PEERS * 2;
/// Configuration options when creating a [`SessionManager`](crate::session::SessionManager).
/// Configuration options for peer session management.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(default))]
@ -111,10 +108,14 @@ impl SessionsConfig {
#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct SessionLimits {
max_pending_inbound: Option<u32>,
max_pending_outbound: Option<u32>,
max_established_inbound: Option<u32>,
max_established_outbound: Option<u32>,
/// Maximum allowed inbound connections.
pub max_pending_inbound: Option<u32>,
/// Maximum allowed outbound connections.
pub max_pending_outbound: Option<u32>,
/// Maximum allowed established inbound connections.
pub max_established_inbound: Option<u32>,
/// Maximum allowed established outbound connections.
pub max_established_outbound: Option<u32>,
}
impl SessionLimits {
@ -143,107 +144,10 @@ impl SessionLimits {
}
}
/// Keeps track of all sessions.
#[derive(Debug, Clone)]
pub struct SessionCounter {
/// Limits to enforce.
limits: SessionLimits,
/// Number of pending incoming sessions.
pending_inbound: u32,
/// Number of pending outgoing sessions.
pending_outbound: u32,
/// Number of active inbound sessions.
active_inbound: u32,
/// Number of active outbound sessions.
active_outbound: u32,
}
// === impl SessionCounter ===
impl SessionCounter {
pub(crate) const fn new(limits: SessionLimits) -> Self {
Self {
limits,
pending_inbound: 0,
pending_outbound: 0,
active_inbound: 0,
active_outbound: 0,
}
}
pub(crate) fn inc_pending_inbound(&mut self) {
self.pending_inbound += 1;
}
pub(crate) fn inc_pending_outbound(&mut self) {
self.pending_outbound += 1;
}
pub(crate) fn dec_pending(&mut self, direction: &Direction) {
match direction {
Direction::Outgoing(_) => {
self.pending_outbound -= 1;
}
Direction::Incoming => {
self.pending_inbound -= 1;
}
}
}
pub(crate) fn inc_active(&mut self, direction: &Direction) {
match direction {
Direction::Outgoing(_) => {
self.active_outbound += 1;
}
Direction::Incoming => {
self.active_inbound += 1;
}
}
}
pub(crate) fn dec_active(&mut self, direction: &Direction) {
match direction {
Direction::Outgoing(_) => {
self.active_outbound -= 1;
}
Direction::Incoming => {
self.active_inbound -= 1;
}
}
}
pub(crate) const fn ensure_pending_outbound(&self) -> Result<(), ExceedsSessionLimit> {
Self::ensure(self.pending_outbound, self.limits.max_pending_outbound)
}
pub(crate) const fn ensure_pending_inbound(&self) -> Result<(), ExceedsSessionLimit> {
Self::ensure(self.pending_inbound, self.limits.max_pending_inbound)
}
const fn ensure(current: u32, limit: Option<u32>) -> Result<(), ExceedsSessionLimit> {
if let Some(limit) = limit {
if current >= limit {
return Err(ExceedsSessionLimit(limit))
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_limits() {
let mut limits = SessionCounter::new(SessionLimits::default().with_max_pending_inbound(2));
assert!(limits.ensure_pending_outbound().is_ok());
limits.inc_pending_inbound();
assert!(limits.ensure_pending_inbound().is_ok());
limits.inc_pending_inbound();
assert!(limits.ensure_pending_inbound().is_err());
}
#[test]
fn scale_session_event_buffer() {
let config = SessionsConfig::default().with_upscaled_event_buffer(10);

View File

@ -0,0 +1,4 @@
//! Peer sessions configuration.
pub mod config;
pub use config::{SessionLimits, SessionsConfig};

View File

@ -29,6 +29,7 @@ reth-provider.workspace = true
reth-tokio-util.workspace = true
reth-consensus.workspace = true
reth-network-peers.workspace = true
reth-network-types.workspace = true
# ethereum
enr = { workspace = true, features = ["serde", "rust-secp256k1"] }
@ -75,6 +76,7 @@ reth-primitives = { workspace = true, features = ["test-utils"] }
# integration tests
reth-network = { workspace = true, features = ["test-utils"] }
reth-network-p2p = { workspace = true, features = ["test-utils"] }
reth-network-types = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-tracing.workspace = true
@ -95,8 +97,8 @@ criterion = { workspace = true, features = ["async_tokio", "html_reports"] }
[features]
default = ["serde"]
geth-tests = []
serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr/serde", "dep:serde_json"]
test-utils = ["reth-provider/test-utils", "dep:tempfile", "reth-transaction-pool/test-utils"]
serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr/serde", "dep:serde_json", "reth-network-types/serde"]
test-utils = ["reth-provider/test-utils", "dep:tempfile", "reth-transaction-pool/test-utils", "reth-network-types/test-utils"]
[[bench]]
name = "bench"

View File

@ -3,8 +3,6 @@
use crate::{
error::NetworkError,
import::{BlockImport, ProofOfStakeBlockImport},
peers::PeersConfig,
session::SessionsConfig,
transactions::TransactionsManagerConfig,
NetworkHandle, NetworkManager,
};
@ -17,6 +15,7 @@ use reth_discv5::NetworkStackId;
use reth_dns_discovery::DnsDiscoveryConfig;
use reth_eth_wire::{HelloMessage, HelloMessageWithProtocols, Status};
use reth_network_peers::{pk2id, PeerId};
use reth_network_types::{PeersConfig, SessionsConfig};
use reth_primitives::{ForkFilter, Head};
use reth_provider::{BlockReader, HeaderProvider};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};

View File

@ -6,6 +6,7 @@ use reth_eth_wire::{
errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
DisconnectReason,
};
use reth_network_types::BackoffKind;
use std::{fmt, io, io::ErrorKind, net::SocketAddr};
/// Service kind.
@ -104,34 +105,6 @@ pub(crate) trait SessionError: fmt::Debug + fmt::Display {
fn should_backoff(&self) -> Option<BackoffKind>;
}
/// Describes the type of backoff should be applied.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BackoffKind {
/// Use the lowest configured backoff duration.
///
/// This applies to connection problems where there is a chance that they will be resolved
/// after the short duration.
Low,
/// Use a slightly higher duration to put a peer in timeout
///
/// This applies to more severe connection problems where there is a lower chance that they
/// will be resolved.
Medium,
/// Use the max configured backoff duration.
///
/// This is intended for spammers, or bad peers in general.
High,
}
// === impl BackoffKind ===
impl BackoffKind {
/// Returns true if the backoff is considered severe.
pub(crate) const fn is_severe(&self) -> bool {
matches!(self, Self::Medium | Self::High)
}
}
impl SessionError for EthStreamError {
fn merits_discovery_ban(&self) -> bool {
match self {

View File

@ -143,12 +143,12 @@ pub use fetch::FetchClient;
pub use manager::{NetworkEvent, NetworkManager};
pub use message::PeerRequest;
pub use network::{NetworkEvents, NetworkHandle, NetworkProtocols};
pub use peers::PeersConfig;
pub use session::{
ActiveSessionHandle, ActiveSessionMessage, Direction, PeerInfo, PendingSessionEvent,
PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, SessionEvent, SessionId,
SessionLimits, SessionManager, SessionsConfig,
SessionManager,
};
pub use transactions::{FilterAnnouncement, MessageFilter, ValidateTx68};
pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols};
pub use reth_network_types::{PeersConfig, SessionsConfig};

View File

@ -1,12 +1,7 @@
//! Peer related implementations
use crate::{
error::{BackoffKind, SessionError},
peers::{
reputation::{
is_banned_reputation, DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE,
},
ReputationChangeWeights, DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS,
DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND,
},
error::SessionError,
session::{Direction, PendingSessionHandshakeError},
swarm::NetworkConnectionState,
};
@ -15,13 +10,21 @@ use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
use reth_net_banlist::BanList;
use reth_network_api::{PeerKind, ReputationChangeKind};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::{
peers::{
config::PeerBackoffDurations,
reputation::{
is_banned_reputation, DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE,
},
},
ConnectionsConfig, PeersConfig, ReputationChangeWeights,
};
use reth_primitives::ForkId;
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
fmt::Display,
io::{self, ErrorKind},
io::{self},
net::{IpAddr, SocketAddr},
path::Path,
task::{Context, Poll},
time::Duration,
};
@ -31,7 +34,7 @@ use tokio::{
time::{Instant, Interval},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{info, trace};
use tracing::trace;
/// A communication channel to the [`PeersManager`] to apply manual changes to the peer set.
#[derive(Clone, Debug)]
@ -170,7 +173,7 @@ impl PeersManager {
reputation_weights,
refill_slots_interval: tokio::time::interval(refill_slots_interval),
release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
connection_info,
connection_info: ConnectionInfo::new(connection_info),
ban_list,
backed_off_peers: Default::default(),
ban_duration,
@ -238,9 +241,7 @@ impl PeersManager {
return Err(InboundConnectionError::IpBanned)
}
if (!self.connection_info.has_in_capacity() || self.connection_info.max_inbound == 0) &&
self.trusted_peer_ids.is_empty()
{
if !self.connection_info.has_in_capacity() && self.trusted_peer_ids.is_empty() {
// if we don't have any inbound slots and no trusted peers, we don't accept any new
// connections
return Err(InboundConnectionError::ExceedsCapacity)
@ -918,42 +919,37 @@ impl Default for PeersManager {
}
/// Tracks stats about connected nodes
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize), serde(default))]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct ConnectionInfo {
/// Counter for currently occupied slots for active outbound connections.
#[cfg_attr(feature = "serde", serde(skip))]
num_outbound: usize,
/// Counter for pending outbound connections.
#[cfg_attr(feature = "serde", serde(skip))]
num_pending_out: usize,
/// Counter for currently occupied slots for active inbound connections.
#[cfg_attr(feature = "serde", serde(skip))]
num_inbound: usize,
/// Counter for pending inbound connections.
#[cfg_attr(feature = "serde", serde(skip))]
num_pending_in: usize,
/// Maximum allowed outbound connections.
max_outbound: usize,
/// Maximum allowed inbound connections.
max_inbound: usize,
/// Maximum allowed concurrent outbound dials.
#[cfg_attr(feature = "serde", serde(default))]
max_concurrent_outbound_dials: usize,
/// Restrictions on number of connections.
config: ConnectionsConfig,
}
// === impl ConnectionInfo ===
impl ConnectionInfo {
/// Returns a new [`ConnectionInfo`] with the given config.
const fn new(config: ConnectionsConfig) -> Self {
Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
}
/// Returns `true` if there's still capacity for a new outgoing connection.
const fn has_out_capacity(&self) -> bool {
self.num_pending_out < self.max_concurrent_outbound_dials &&
self.num_outbound < self.max_outbound
self.num_pending_out < self.config.max_concurrent_outbound_dials &&
self.num_outbound < self.config.max_outbound
}
/// Returns `true` if there's still capacity for a new incoming connection.
const fn has_in_capacity(&self) -> bool {
self.num_inbound < self.max_inbound
self.num_inbound < self.config.max_inbound
}
fn decr_state(&mut self, state: PeerConnectionState) {
@ -998,20 +994,6 @@ impl ConnectionInfo {
}
}
impl Default for ConnectionInfo {
fn default() -> Self {
Self {
num_outbound: 0,
num_inbound: 0,
max_outbound: DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize,
max_inbound: DEFAULT_MAX_COUNT_PEERS_INBOUND as usize,
max_concurrent_outbound_dials: DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS,
num_pending_out: 0,
num_pending_in: 0,
}
}
}
/// Tracks info about a single peer.
#[derive(Debug, Clone)]
pub struct Peer {
@ -1029,7 +1011,8 @@ pub struct Peer {
kind: PeerKind,
/// Whether the peer is currently backed off.
backed_off: bool,
/// Counts number of times the peer was backed off due to a severe [`BackoffKind`].
/// Counts number of times the peer was backed off due to a severe
/// [`reth_network_types::BackoffKind`].
severe_backoff_counter: u8,
}
@ -1263,265 +1246,6 @@ pub enum PeerAction {
PeerRemoved(PeerId),
}
/// Config type for initiating a [`PeersManager`] instance.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(default))]
pub struct PeersConfig {
/// How often to recheck free slots for outbound connections.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub refill_slots_interval: Duration,
/// Trusted nodes to connect to or accept from
pub trusted_nodes: HashSet<NodeRecord>,
/// Connect to or accept from trusted nodes only?
#[cfg_attr(feature = "serde", serde(alias = "connect_trusted_nodes_only"))]
pub trusted_nodes_only: bool,
/// Maximum number of backoff attempts before we give up on a peer and dropping.
///
/// The max time spent of a peer before it's removed from the set is determined by the
/// configured backoff duration and the max backoff count.
///
/// With a backoff counter of 5 and a backoff duration of 1h, the minimum time spent of the
/// peer in the table is the sum of all backoffs (1h + 2h + 3h + 4h + 5h = 15h).
///
/// Note: this does not apply to trusted peers.
pub max_backoff_count: u8,
/// Basic nodes to connect to.
#[cfg_attr(feature = "serde", serde(skip))]
pub basic_nodes: HashSet<NodeRecord>,
/// How long to ban bad peers.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub ban_duration: Duration,
/// Restrictions on `PeerIds` and Ips.
#[cfg_attr(feature = "serde", serde(skip))]
pub ban_list: BanList,
/// Restrictions on connections.
pub connection_info: ConnectionInfo,
/// How to weigh reputation changes.
pub reputation_weights: ReputationChangeWeights,
/// How long to backoff peers that are we failed to connect to for non-fatal reasons, such as
/// [`DisconnectReason::TooManyPeers`].
///
/// The backoff duration increases with number of backoff attempts.
pub backoff_durations: PeerBackoffDurations,
}
impl Default for PeersConfig {
fn default() -> Self {
Self {
refill_slots_interval: Duration::from_millis(5_000),
connection_info: Default::default(),
reputation_weights: Default::default(),
ban_list: Default::default(),
// Ban peers for 12h
ban_duration: Duration::from_secs(60 * 60 * 12),
backoff_durations: Default::default(),
trusted_nodes: Default::default(),
trusted_nodes_only: false,
basic_nodes: Default::default(),
max_backoff_count: 5,
}
}
}
impl PeersConfig {
/// A set of `peer_ids` and ip addr that we want to never connect to
pub fn with_ban_list(mut self, ban_list: BanList) -> Self {
self.ban_list = ban_list;
self
}
/// Configure how long to ban bad peers
pub const fn with_ban_duration(mut self, ban_duration: Duration) -> Self {
self.ban_duration = ban_duration;
self
}
/// Maximum occupied slots for outbound connections.
pub const fn with_max_pending_outbound(mut self, num_outbound: usize) -> Self {
self.connection_info.num_outbound = num_outbound;
self
}
/// Maximum occupied slots for inbound connections.
pub const fn with_max_pending_inbound(mut self, num_inbound: usize) -> Self {
self.connection_info.num_inbound = num_inbound;
self
}
/// Maximum allowed outbound connections.
pub const fn with_max_outbound(mut self, max_outbound: usize) -> Self {
self.connection_info.max_outbound = max_outbound;
self
}
/// Maximum allowed inbound connections with optional update.
pub const fn with_max_inbound_opt(mut self, max_inbound: Option<usize>) -> Self {
if let Some(max_inbound) = max_inbound {
self.connection_info.max_inbound = max_inbound;
}
self
}
/// Maximum allowed outbound connections with optional update.
pub const fn with_max_outbound_opt(mut self, max_outbound: Option<usize>) -> Self {
if let Some(max_outbound) = max_outbound {
self.connection_info.max_outbound = max_outbound;
}
self
}
/// Maximum allowed inbound connections.
pub const fn with_max_inbound(mut self, max_inbound: usize) -> Self {
self.connection_info.max_inbound = max_inbound;
self
}
/// Maximum allowed concurrent outbound dials.
pub const fn with_max_concurrent_dials(mut self, max_concurrent_outbound_dials: usize) -> Self {
self.connection_info.max_concurrent_outbound_dials = max_concurrent_outbound_dials;
self
}
/// Nodes to always connect to.
pub fn with_trusted_nodes(mut self, nodes: HashSet<NodeRecord>) -> Self {
self.trusted_nodes = nodes;
self
}
/// Connect only to trusted nodes.
pub const fn with_trusted_nodes_only(mut self, trusted_only: bool) -> Self {
self.trusted_nodes_only = trusted_only;
self
}
/// Nodes available at launch.
pub fn with_basic_nodes(mut self, nodes: HashSet<NodeRecord>) -> Self {
self.basic_nodes = nodes;
self
}
/// Configures the max allowed backoff count.
pub const fn with_max_backoff_count(mut self, max_backoff_count: u8) -> Self {
self.max_backoff_count = max_backoff_count;
self
}
/// Configures how to weigh reputation changes.
pub const fn with_reputation_weights(
mut self,
reputation_weights: ReputationChangeWeights,
) -> Self {
self.reputation_weights = reputation_weights;
self
}
/// Configures how long to backoff peers that are we failed to connect to for non-fatal reasons
pub const fn with_backoff_durations(mut self, backoff_durations: PeerBackoffDurations) -> Self {
self.backoff_durations = backoff_durations;
self
}
/// Returns the maximum number of peers, inbound and outbound.
pub const fn max_peers(&self) -> usize {
self.connection_info.max_outbound + self.connection_info.max_inbound
}
/// Read from file nodes available at launch. Ignored if None.
pub fn with_basic_nodes_from_file(
self,
optional_file: Option<impl AsRef<Path>>,
) -> Result<Self, io::Error> {
let Some(file_path) = optional_file else { return Ok(self) };
let reader = match std::fs::File::open(file_path.as_ref()) {
Ok(file) => io::BufReader::new(file),
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(self),
Err(e) => Err(e)?,
};
info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers");
let nodes: HashSet<NodeRecord> = serde_json::from_reader(reader)?;
Ok(self.with_basic_nodes(nodes))
}
/// Returns settings for testing
#[cfg(test)]
fn test() -> Self {
Self {
refill_slots_interval: Duration::from_millis(100),
backoff_durations: PeerBackoffDurations::test(),
..Default::default()
}
}
}
/// The durations to use when a backoff should be applied to a peer.
///
/// See also [`BackoffKind`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PeerBackoffDurations {
/// Applies to connection problems where there is a chance that they will be resolved after the
/// short duration.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub low: Duration,
/// Applies to more severe connection problems where there is a lower chance that they will be
/// resolved.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub medium: Duration,
/// Intended for spammers, or bad peers in general.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub high: Duration,
/// Maximum total backoff duration.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub max: Duration,
}
impl PeerBackoffDurations {
/// Returns the corresponding [`Duration`]
pub const fn backoff(&self, kind: BackoffKind) -> Duration {
match kind {
BackoffKind::Low => self.low,
BackoffKind::Medium => self.medium,
BackoffKind::High => self.high,
}
}
/// Returns the timestamp until which we should backoff.
///
/// The Backoff duration is capped by the configured maximum backoff duration.
pub fn backoff_until(&self, kind: BackoffKind, backoff_counter: u8) -> std::time::Instant {
let backoff_time = self.backoff(kind);
let backoff_time = backoff_time + backoff_time * backoff_counter as u32;
let now = std::time::Instant::now();
now + backoff_time.min(self.max)
}
/// Returns durations for testing.
#[cfg(test)]
const fn test() -> Self {
Self {
low: Duration::from_millis(200),
medium: Duration::from_millis(200),
high: Duration::from_millis(200),
max: Duration::from_millis(200),
}
}
}
impl Default for PeerBackoffDurations {
fn default() -> Self {
Self {
low: Duration::from_secs(30),
// 3min
medium: Duration::from_secs(60 * 3),
// 15min
high: Duration::from_secs(60 * 15),
// 1h
max: Duration::from_secs(60 * 60),
}
}
}
/// Error thrown when a incoming connection is rejected right away
#[derive(Debug, Error, PartialEq, Eq)]
pub enum InboundConnectionError {
@ -1541,11 +1265,9 @@ impl Display for InboundConnectionError {
mod tests {
use super::PeersManager;
use crate::{
error::BackoffKind,
peers::{
manager::{ConnectionInfo, PeerBackoffDurations, PeerConnectionState},
reputation::DEFAULT_REPUTATION,
InboundConnectionError, PeerAction,
ConnectionInfo, InboundConnectionError, PeerAction, PeerBackoffDurations,
PeerConnectionState,
},
session::PendingSessionHandshakeError,
PeersConfig,
@ -1558,6 +1280,7 @@ mod tests {
use reth_net_banlist::BanList;
use reth_network_api::{Direction, ReputationChangeKind};
use reth_network_peers::PeerId;
use reth_network_types::{peers::reputation::DEFAULT_REPUTATION, BackoffKind};
use reth_primitives::B512;
use std::{
collections::HashSet,
@ -2106,7 +1829,7 @@ mod tests {
peers.add_trusted_peer_id(trusted);
// saturate the inbound slots
for i in 0..peers.connection_info.max_inbound {
for i in 0..peers.connection_info.config.max_inbound {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
let peer_id = PeerId::random();
@ -2404,7 +2127,7 @@ mod tests {
match a {
Ok(_) => panic!(),
Err(err) => match err {
super::InboundConnectionError::IpBanned {} => {
InboundConnectionError::IpBanned {} => {
assert_eq!(peer_manager.connection_info.num_pending_in, 0)
}
_ => unreachable!(),
@ -2769,7 +2492,7 @@ mod tests {
let mut peer_manager = PeersManager::new(config);
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let socket_addr = SocketAddr::new(ip, 8008);
for _ in 0..peer_manager.connection_info.max_concurrent_outbound_dials * 2 {
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
peer_manager.add_peer(PeerId::random(), socket_addr, None);
}
@ -2779,7 +2502,7 @@ mod tests {
.iter()
.filter(|ev| matches!(ev, PeerAction::Connect { .. }))
.count();
assert_eq!(dials, peer_manager.connection_info.max_concurrent_outbound_dials);
assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
}
#[tokio::test]
@ -2790,18 +2513,18 @@ mod tests {
let socket_addr = SocketAddr::new(ip, 8008);
// add more peers than allowed
for _ in 0..peer_manager.connection_info.max_concurrent_outbound_dials * 2 {
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
peer_manager.add_peer(PeerId::random(), socket_addr, None);
}
for _ in 0..peer_manager.connection_info.max_concurrent_outbound_dials * 2 {
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
match event!(peer_manager) {
PeerAction::PeerAdded(_) => {}
_ => unreachable!(),
}
}
for _ in 0..peer_manager.connection_info.max_concurrent_outbound_dials {
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
match event!(peer_manager) {
PeerAction::Connect { .. } => {}
_ => unreachable!(),
@ -2813,7 +2536,7 @@ mod tests {
// all dialed connections should be in 'PendingOut' state
let dials = peer_manager.connection_info.num_pending_out;
assert_eq!(dials, peer_manager.connection_info.max_concurrent_outbound_dials);
assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
let num_pendingout_states = peer_manager
.peers
@ -2823,7 +2546,7 @@ mod tests {
.collect::<Vec<PeerId>>();
assert_eq!(
num_pendingout_states.len(),
peer_manager.connection_info.max_concurrent_outbound_dials
peer_manager.connection_info.config.max_concurrent_outbound_dials
);
// establish dialed connections

View File

@ -1,20 +0,0 @@
//! Peer related implementations
mod manager;
mod reputation;
pub(crate) use manager::InboundConnectionError;
pub use manager::{ConnectionInfo, Peer, PeerAction, PeersConfig, PeersHandle, PeersManager};
pub use reputation::ReputationChangeWeights;
pub use reth_network_api::PeerKind;
/// Maximum number of available slots for outbound sessions.
pub const DEFAULT_MAX_COUNT_PEERS_OUTBOUND: u32 = 100;
/// Maximum number of available slots for inbound sessions.
pub const DEFAULT_MAX_COUNT_PEERS_INBOUND: u32 = 30;
/// Maximum number of available slots for concurrent outgoing dials.
///
/// This restricts how many outbound dials can be performed concurrently.
pub const DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS: usize = 15;

View File

@ -3,7 +3,6 @@
use crate::{
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerResponse, PeerResponseResult},
session::{
config::INITIAL_REQUEST_TIMEOUT,
conn::EthRlpxConnection,
handle::{ActiveSessionMessage, SessionCommand},
SessionId,
@ -20,6 +19,7 @@ use reth_eth_wire::{
use reth_metrics::common::mpsc::MeteredPollSender;
use reth_network_p2p::error::RequestError;
use reth_network_peers::PeerId;
use reth_network_types::session::config::INITIAL_REQUEST_TIMEOUT;
use rustc_hash::FxHashMap;
use std::{
collections::VecDeque,
@ -759,10 +759,7 @@ fn calculate_new_timeout(current_timeout: Duration, estimated_rtt: Duration) ->
#[cfg(test)]
mod tests {
use super::*;
use crate::session::{
config::PROTOCOL_BREACH_REQUEST_TIMEOUT, handle::PendingSessionEvent,
start_pending_incoming_session,
};
use crate::session::{handle::PendingSessionEvent, start_pending_incoming_session};
use reth_chainspec::MAINNET;
use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{
@ -770,6 +767,7 @@ mod tests {
UnauthedEthStream, UnauthedP2PStream,
};
use reth_network_peers::pk2id;
use reth_network_types::session::config::PROTOCOL_BREACH_REQUEST_TIMEOUT;
use reth_primitives::{ForkFilter, Hardfork};
use secp256k1::{SecretKey, SECP256K1};
use tokio::{

View File

@ -0,0 +1,106 @@
use reth_network_api::Direction;
use reth_network_types::SessionLimits;
use super::ExceedsSessionLimit;
/// Keeps track of all sessions.
#[derive(Debug, Clone)]
pub struct SessionCounter {
/// Limits to enforce.
limits: SessionLimits,
/// Number of pending incoming sessions.
pending_inbound: u32,
/// Number of pending outgoing sessions.
pending_outbound: u32,
/// Number of active inbound sessions.
active_inbound: u32,
/// Number of active outbound sessions.
active_outbound: u32,
}
// === impl SessionCounter ===
impl SessionCounter {
pub(crate) const fn new(limits: SessionLimits) -> Self {
Self {
limits,
pending_inbound: 0,
pending_outbound: 0,
active_inbound: 0,
active_outbound: 0,
}
}
pub(crate) fn inc_pending_inbound(&mut self) {
self.pending_inbound += 1;
}
pub(crate) fn inc_pending_outbound(&mut self) {
self.pending_outbound += 1;
}
pub(crate) fn dec_pending(&mut self, direction: &Direction) {
match direction {
Direction::Outgoing(_) => {
self.pending_outbound -= 1;
}
Direction::Incoming => {
self.pending_inbound -= 1;
}
}
}
pub(crate) fn inc_active(&mut self, direction: &Direction) {
match direction {
Direction::Outgoing(_) => {
self.active_outbound += 1;
}
Direction::Incoming => {
self.active_inbound += 1;
}
}
}
pub(crate) fn dec_active(&mut self, direction: &Direction) {
match direction {
Direction::Outgoing(_) => {
self.active_outbound -= 1;
}
Direction::Incoming => {
self.active_inbound -= 1;
}
}
}
pub(crate) const fn ensure_pending_outbound(&self) -> Result<(), ExceedsSessionLimit> {
Self::ensure(self.pending_outbound, self.limits.max_pending_outbound)
}
pub(crate) const fn ensure_pending_inbound(&self) -> Result<(), ExceedsSessionLimit> {
Self::ensure(self.pending_inbound, self.limits.max_pending_inbound)
}
const fn ensure(current: u32, limit: Option<u32>) -> Result<(), ExceedsSessionLimit> {
if let Some(limit) = limit {
if current >= limit {
return Err(ExceedsSessionLimit(limit))
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_limits() {
let mut limits = SessionCounter::new(SessionLimits::default().with_max_pending_inbound(2));
assert!(limits.ensure_pending_outbound().is_ok());
limits.inc_pending_inbound();
assert!(limits.ensure_pending_inbound().is_ok());
limits.inc_pending_inbound();
assert!(limits.ensure_pending_inbound().is_err());
}
}

View File

@ -1,10 +1,7 @@
//! Support for handling peer sessions.
use crate::{
message::PeerMessage,
metrics::SessionManagerMetrics,
session::{active::ActiveSession, config::SessionCounter},
};
use crate::{message::PeerMessage, metrics::SessionManagerMetrics, session::active::ActiveSession};
use counter::SessionCounter;
use futures::{future::Either, io, FutureExt, StreamExt};
use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::{
@ -15,6 +12,7 @@ use reth_eth_wire::{
};
use reth_metrics::common::mpsc::MeteredPollSender;
use reth_network_peers::PeerId;
use reth_network_types::SessionsConfig;
use reth_primitives::{ForkFilter, ForkId, ForkTransition, Head};
use reth_tasks::TaskSpawner;
use rustc_hash::FxHashMap;
@ -37,12 +35,11 @@ use tokio_util::sync::PollSender;
use tracing::{debug, instrument, trace};
mod active;
mod config;
mod conn;
mod counter;
mod handle;
pub use crate::message::PeerRequestSender;
use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocolHandlers, RlpxSubProtocols};
pub use config::{SessionLimits, SessionsConfig};
pub use handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
SessionCommand,

View File

@ -57,9 +57,9 @@ pub mod tx_manager {
/// Constants used by [`TransactionFetcher`](super::TransactionFetcher).
pub mod tx_fetcher {
use crate::{
peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND},
transactions::fetcher::TransactionFetcherInfo,
use crate::transactions::fetcher::TransactionFetcherInfo;
use reth_network_types::peers::config::{
DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND,
};
use super::{