diff --git a/Cargo.lock b/Cargo.lock index 0326a37f4..09bcccf65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7802,6 +7802,7 @@ dependencies = [ "reth-ecies", "reth-eth-wire", "reth-eth-wire-types", + "reth-ethereum-forks", "reth-fs-util", "reth-metrics", "reth-net-banlist", diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index ad8e65dff..ab9e89c2c 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -23,6 +23,7 @@ reth-network-p2p.workspace = true reth-discv4.workspace = true reth-discv5.workspace = true reth-dns-discovery.workspace = true +reth-ethereum-forks.workspace = true reth-eth-wire.workspace = true reth-eth-wire-types.workspace = true reth-ecies.workspace = true diff --git a/crates/net/network/src/builder.rs b/crates/net/network/src/builder.rs index 31038906b..da003a2e2 100644 --- a/crates/net/network/src/builder.rs +++ b/crates/net/network/src/builder.rs @@ -1,15 +1,14 @@ //! Builder support for configuring the entire setup. -use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives}; -use reth_network_api::test_utils::PeersHandleProvider; -use reth_transaction_pool::TransactionPool; -use tokio::sync::mpsc; - use crate::{ eth_requests::EthRequestHandler, transactions::{TransactionsManager, TransactionsManagerConfig}, NetworkHandle, NetworkManager, }; +use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives}; +use reth_network_api::test_utils::PeersHandleProvider; +use reth_transaction_pool::TransactionPool; +use tokio::sync::mpsc; /// We set the max channel capacity of the `EthRequestHandler` to 256 /// 256 requests with malicious 10MB body requests is 2.6GB which can be absorbed by the node. diff --git a/crates/net/network/src/cache.rs b/crates/net/network/src/cache.rs index 758b49167..32389ec4b 100644 --- a/crates/net/network/src/cache.rs +++ b/crates/net/network/src/cache.rs @@ -1,11 +1,10 @@ //! Network cache support use core::hash::BuildHasher; -use std::{fmt, hash::Hash}; - use derive_more::{Deref, DerefMut}; use itertools::Itertools; use schnellru::{ByLength, Limiter, RandomState, Unlimited}; +use std::{fmt, hash::Hash}; /// A minimal LRU cache based on a [`LruMap`](schnellru::LruMap) with limited capacity. /// diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index db7b384c2..e54000895 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -1,27 +1,25 @@ //! Network config support -use std::{collections::HashSet, net::SocketAddr, sync::Arc}; - -use reth_chainspec::{ChainSpecProvider, EthChainSpec, Hardforks}; -use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NatResolver, DEFAULT_DISCOVERY_ADDRESS}; -use reth_discv5::NetworkStackId; -use reth_dns_discovery::DnsDiscoveryConfig; -use reth_eth_wire::{ - EthNetworkPrimitives, HelloMessage, HelloMessageWithProtocols, NetworkPrimitives, Status, -}; -use reth_network_peers::{mainnet_nodes, pk2id, sepolia_nodes, PeerId, TrustedPeer}; -use reth_network_types::{PeersConfig, SessionsConfig}; -use reth_primitives::{ForkFilter, Head}; -use reth_storage_api::{noop::NoopBlockReader, BlockNumReader, BlockReader, HeaderProvider}; -use reth_tasks::{TaskSpawner, TokioTaskExecutor}; -use secp256k1::SECP256K1; - use crate::{ error::NetworkError, import::{BlockImport, ProofOfStakeBlockImport}, transactions::TransactionsManagerConfig, NetworkHandle, NetworkManager, }; +use reth_chainspec::{ChainSpecProvider, EthChainSpec, Hardforks}; +use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NatResolver, DEFAULT_DISCOVERY_ADDRESS}; +use reth_discv5::NetworkStackId; +use reth_dns_discovery::DnsDiscoveryConfig; +use reth_eth_wire::{ + EthNetworkPrimitives, HelloMessage, HelloMessageWithProtocols, NetworkPrimitives, Status, +}; +use reth_ethereum_forks::{ForkFilter, Head}; +use reth_network_peers::{mainnet_nodes, pk2id, sepolia_nodes, PeerId, TrustedPeer}; +use reth_network_types::{PeersConfig, SessionsConfig}; +use reth_storage_api::{noop::NoopBlockReader, BlockNumReader, BlockReader, HeaderProvider}; +use reth_tasks::{TaskSpawner, TokioTaskExecutor}; +use secp256k1::SECP256K1; +use std::{collections::HashSet, net::SocketAddr, sync::Arc}; // re-export for convenience use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocols}; diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index 5b2bb788f..c0b9ffa76 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -1,13 +1,9 @@ //! Discovery support for the network. -use std::{ - collections::VecDeque, - net::{IpAddr, SocketAddr}, - pin::Pin, - sync::Arc, - task::{ready, Context, Poll}, +use crate::{ + cache::LruMap, + error::{NetworkError, ServiceKind}, }; - use enr::Enr; use futures::StreamExt; use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config}; @@ -15,20 +11,22 @@ use reth_discv5::{DiscoveredPeer, Discv5}; use reth_dns_discovery::{ DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver, }; +use reth_ethereum_forks::{EnrForkIdEntry, ForkId}; use reth_network_api::{DiscoveredEvent, DiscoveryEvent}; use reth_network_peers::{NodeRecord, PeerId}; use reth_network_types::PeerAddr; -use reth_primitives::{EnrForkIdEntry, ForkId}; use secp256k1::SecretKey; +use std::{ + collections::VecDeque, + net::{IpAddr, SocketAddr}, + pin::Pin, + sync::Arc, + task::{ready, Context, Poll}, +}; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_stream::{wrappers::ReceiverStream, Stream}; use tracing::trace; -use crate::{ - cache::LruMap, - error::{NetworkError, ServiceKind}, -}; - /// Default max capacity for cache of discovered peers. /// /// Default is 10 000 peers. diff --git a/crates/net/network/src/error.rs b/crates/net/network/src/error.rs index 2709c4a29..8156392b2 100644 --- a/crates/net/network/src/error.rs +++ b/crates/net/network/src/error.rs @@ -1,7 +1,6 @@ //! Possible errors when interacting with the network. -use std::{fmt, io, io::ErrorKind, net::SocketAddr}; - +use crate::session::PendingSessionHandshakeError; use reth_dns_discovery::resolver::ResolveError; use reth_ecies::ECIESErrorImpl; use reth_eth_wire::{ @@ -9,8 +8,7 @@ use reth_eth_wire::{ DisconnectReason, }; use reth_network_types::BackoffKind; - -use crate::session::PendingSessionHandshakeError; +use std::{fmt, io, io::ErrorKind, net::SocketAddr}; /// Service kind. #[derive(Debug, PartialEq, Eq, Copy, Clone)] diff --git a/crates/net/network/src/eth_requests.rs b/crates/net/network/src/eth_requests.rs index 8121b9675..0f9348a42 100644 --- a/crates/net/network/src/eth_requests.rs +++ b/crates/net/network/src/eth_requests.rs @@ -1,12 +1,9 @@ //! Blocks/Headers management for the p2p network. -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, - time::Duration, +use crate::{ + budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget, + metrics::EthRequestHandlerMetrics, }; - use alloy_consensus::Header; use alloy_eips::BlockHashOrNumber; use alloy_rlp::Encodable; @@ -20,14 +17,15 @@ use reth_network_p2p::error::RequestResult; use reth_network_peers::PeerId; use reth_primitives::BlockBody; use reth_storage_api::{BlockReader, HeaderProvider, ReceiptProvider}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; use tokio::sync::{mpsc::Receiver, oneshot}; use tokio_stream::wrappers::ReceiverStream; -use crate::{ - budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget, - metrics::EthRequestHandlerMetrics, -}; - // Limits: /// Maximum number of receipts to serve. diff --git a/crates/net/network/src/fetch/client.rs b/crates/net/network/src/fetch/client.rs index 584c079b8..e24ea167f 100644 --- a/crates/net/network/src/fetch/client.rs +++ b/crates/net/network/src/fetch/client.rs @@ -1,10 +1,6 @@ //! A client implementation that can interact with the network and download data. -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; - +use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse}; use alloy_primitives::B256; use futures::{future, future::Either}; use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives}; @@ -18,10 +14,12 @@ use reth_network_p2p::{ }; use reth_network_peers::PeerId; use reth_network_types::ReputationChangeKind; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; use tokio::sync::{mpsc::UnboundedSender, oneshot}; -use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse}; - #[cfg_attr(doc, aquamarine::aquamarine)] /// Front-end API for fetching data from the network. /// diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 8af6300b7..c5474587a 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -4,15 +4,7 @@ mod client; pub use client::FetchClient; -use std::{ - collections::{HashMap, VecDeque}, - sync::{ - atomic::{AtomicU64, AtomicUsize, Ordering}, - Arc, - }, - task::{Context, Poll}, -}; - +use crate::message::BlockRequest; use alloy_primitives::B256; use futures::StreamExt; use reth_eth_wire::{EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives}; @@ -24,11 +16,17 @@ use reth_network_p2p::{ }; use reth_network_peers::PeerId; use reth_network_types::ReputationChangeKind; +use std::{ + collections::{HashMap, VecDeque}, + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, + }, + task::{Context, Poll}, +}; use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; -use crate::message::BlockRequest; - type InflightHeadersRequest = Request>>; type InflightBodiesRequest = Request, PeerRequestResult>>; diff --git a/crates/net/network/src/flattened_response.rs b/crates/net/network/src/flattened_response.rs index 78c3c35f5..df2a9db78 100644 --- a/crates/net/network/src/flattened_response.rs +++ b/crates/net/network/src/flattened_response.rs @@ -1,10 +1,9 @@ +use futures::Future; +use pin_project::pin_project; use std::{ pin::Pin, task::{Context, Poll}, }; - -use futures::Future; -use pin_project::pin_project; use tokio::sync::oneshot::{error::RecvError, Receiver}; /// Flatten a [Receiver] message in order to get rid of the [RecvError] result diff --git a/crates/net/network/src/import.rs b/crates/net/network/src/import.rs index 749b3c347..f63bf2dd7 100644 --- a/crates/net/network/src/import.rs +++ b/crates/net/network/src/import.rs @@ -1,10 +1,8 @@ //! This module provides an abstraction over block import in the form of the `BlockImport` trait. -use std::task::{Context, Poll}; - -use reth_network_peers::PeerId; - use crate::message::NewBlockMessage; +use reth_network_peers::PeerId; +use std::task::{Context, Poll}; /// Abstraction over block import. pub trait BlockImport: std::fmt::Debug + Send + Sync { diff --git a/crates/net/network/src/listener.rs b/crates/net/network/src/listener.rs index e5094f689..9fcc15a10 100644 --- a/crates/net/network/src/listener.rs +++ b/crates/net/network/src/listener.rs @@ -1,13 +1,12 @@ //! Contains connection-oriented interfaces. +use futures::{ready, Stream}; use std::{ io, net::SocketAddr, pin::Pin, task::{Context, Poll}, }; - -use futures::{ready, Stream}; use tokio::net::{TcpListener, TcpStream}; /// A tcp connection listener. diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 0738be1bc..c9caa4122 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -15,39 +15,6 @@ //! (IP+port) of our node is published via discovery, remote peers can initiate inbound connections //! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the `RLPx` session. -use std::{ - net::SocketAddr, - path::Path, - pin::Pin, - sync::{ - atomic::{AtomicU64, AtomicUsize, Ordering}, - Arc, - }, - task::{Context, Poll}, - time::{Duration, Instant}, -}; - -use futures::{Future, StreamExt}; -use parking_lot::Mutex; -use reth_eth_wire::{ - capability::CapabilityMessage, Capabilities, DisconnectReason, EthNetworkPrimitives, - NetworkPrimitives, -}; -use reth_fs_util::{self as fs, FsPathError}; -use reth_metrics::common::mpsc::UnboundedMeteredSender; -use reth_network_api::{ - test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest, -}; -use reth_network_peers::{NodeRecord, PeerId}; -use reth_network_types::ReputationChangeKind; -use reth_storage_api::BlockNumReader; -use reth_tasks::shutdown::GracefulShutdown; -use reth_tokio_util::EventSender; -use secp256k1::SecretKey; -use tokio::sync::mpsc::{self, error::TrySendError}; -use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{debug, error, trace, warn}; - use crate::{ budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM}, config::NetworkConfig, @@ -68,6 +35,37 @@ use crate::{ transactions::NetworkTransactionEvent, FetchClient, NetworkBuilder, }; +use futures::{Future, StreamExt}; +use parking_lot::Mutex; +use reth_eth_wire::{ + capability::CapabilityMessage, Capabilities, DisconnectReason, EthNetworkPrimitives, + NetworkPrimitives, +}; +use reth_fs_util::{self as fs, FsPathError}; +use reth_metrics::common::mpsc::UnboundedMeteredSender; +use reth_network_api::{ + test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest, +}; +use reth_network_peers::{NodeRecord, PeerId}; +use reth_network_types::ReputationChangeKind; +use reth_storage_api::BlockNumReader; +use reth_tasks::shutdown::GracefulShutdown; +use reth_tokio_util::EventSender; +use secp256k1::SecretKey; +use std::{ + net::SocketAddr, + path::Path, + pin::Pin, + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, + }, + task::{Context, Poll}, + time::{Duration, Instant}, +}; +use tokio::sync::mpsc::{self, error::TrySendError}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::{debug, error, trace, warn}; #[cfg_attr(doc, aquamarine::aquamarine)] /// Manages the _entire_ state of the network. diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 304057741..4821e2592 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -3,11 +3,6 @@ //! An `RLPx` stream is multiplexed via the prepended message-id of a framed message. //! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, -use std::{ - sync::Arc, - task::{ready, Context, Poll}, -}; - use alloy_consensus::BlockHeader; use alloy_primitives::{Bytes, B256}; use futures::FutureExt; @@ -20,6 +15,10 @@ use reth_eth_wire::{ use reth_network_api::PeerRequest; use reth_network_p2p::error::{RequestError, RequestResult}; use reth_primitives::{PooledTransactionsElement, ReceiptWithBloom}; +use std::{ + sync::Arc, + task::{ready, Context, Poll}, +}; use tokio::sync::oneshot; /// Internal form of a `NewBlock` message diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 2fa3fd90e..0af0cb1ad 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -1,11 +1,7 @@ -use std::{ - net::SocketAddr, - sync::{ - atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, - Arc, - }, +use crate::{ + config::NetworkMode, protocol::RlpxSubProtocol, swarm::NetworkConnectionState, + transactions::TransactionsHandle, FetchClient, }; - use alloy_primitives::B256; use enr::Enr; use parking_lot::Mutex; @@ -15,6 +11,7 @@ use reth_eth_wire::{ DisconnectReason, EthNetworkPrimitives, NetworkPrimitives, NewBlock, NewPooledTransactionHashes, SharedTransactions, }; +use reth_ethereum_forks::Head; use reth_network_api::{ test_utils::{PeersHandle, PeersHandleProvider}, BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent, @@ -24,20 +21,22 @@ use reth_network_api::{ use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider}; use reth_network_peers::{NodeRecord, PeerId}; use reth_network_types::{PeerAddr, PeerKind, Reputation, ReputationChangeKind}; -use reth_primitives::{Head, TransactionSigned}; +use reth_primitives::TransactionSigned; use reth_tokio_util::{EventSender, EventStream}; use secp256k1::SecretKey; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, + Arc, + }, +}; use tokio::sync::{ mpsc::{self, UnboundedSender}, oneshot, }; use tokio_stream::wrappers::UnboundedReceiverStream; -use crate::{ - config::NetworkMode, protocol::RlpxSubProtocol, swarm::NetworkConnectionState, - transactions::TransactionsHandle, FetchClient, -}; - /// A _shareable_ network frontend. Used to interact with the network. /// /// See also [`NetworkManager`](crate::NetworkManager). diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers.rs index 4855ff5e7..d4b762e3e 100644 --- a/crates/net/network/src/peers.rs +++ b/crates/net/network/src/peers.rs @@ -1,16 +1,13 @@ //! Peer related implementations -use std::{ - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - fmt::Display, - io::{self}, - net::{IpAddr, SocketAddr}, - task::{Context, Poll}, - time::Duration, +use crate::{ + error::SessionError, + session::{Direction, PendingSessionHandshakeError}, + swarm::NetworkConnectionState, }; - use futures::StreamExt; use reth_eth_wire::{errors::EthStreamError, DisconnectReason}; +use reth_ethereum_forks::ForkId; use reth_net_banlist::BanList; use reth_network_api::test_utils::{PeerCommand, PeersHandle}; use reth_network_peers::{NodeRecord, PeerId}; @@ -22,7 +19,14 @@ use reth_network_types::{ ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig, ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights, }; -use reth_primitives::ForkId; +use std::{ + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, + fmt::Display, + io::{self}, + net::{IpAddr, SocketAddr}, + task::{Context, Poll}, + time::Duration, +}; use thiserror::Error; use tokio::{ sync::mpsc, @@ -31,12 +35,6 @@ use tokio::{ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{trace, warn}; -use crate::{ - error::SessionError, - session::{Direction, PendingSessionHandshakeError}, - swarm::NetworkConnectionState, -}; - /// Maintains the state of _all_ the peers known to the network. /// /// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`]. diff --git a/crates/net/network/src/protocol.rs b/crates/net/network/src/protocol.rs index eeffd1c95..aa0749c2c 100644 --- a/crates/net/network/src/protocol.rs +++ b/crates/net/network/src/protocol.rs @@ -2,19 +2,18 @@ //! //! See also -use std::{ - fmt, - net::SocketAddr, - ops::{Deref, DerefMut}, - pin::Pin, -}; - use alloy_primitives::bytes::BytesMut; use futures::Stream; use reth_eth_wire::{ capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, }; use reth_network_api::{Direction, PeerId}; +use std::{ + fmt, + net::SocketAddr, + ops::{Deref, DerefMut}, + pin::Pin, +}; /// A trait that allows to offer additional RLPx-based application-level protocols when establishing /// a peer-to-peer connection. diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index f979a912c..76701f7e2 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -11,6 +11,14 @@ use std::{ time::{Duration, Instant}, }; +use crate::{ + message::{NewBlockMessage, PeerMessage, PeerResponse, PeerResponseResult}, + session::{ + conn::EthRlpxConnection, + handle::{ActiveSessionMessage, SessionCommand}, + SessionId, + }, +}; use alloy_primitives::Sealable; use futures::{stream::Fuse, SinkExt, StreamExt}; use metrics::Gauge; @@ -34,15 +42,6 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::PollSender; use tracing::{debug, trace}; -use crate::{ - message::{NewBlockMessage, PeerMessage, PeerResponse, PeerResponseResult}, - session::{ - conn::EthRlpxConnection, - handle::{ActiveSessionMessage, SessionCommand}, - SessionId, - }, -}; - // Constants for timeout updating. /// Minimum timeout value diff --git a/crates/net/network/src/session/conn.rs b/crates/net/network/src/session/conn.rs index 5329f0102..45b83d1c4 100644 --- a/crates/net/network/src/session/conn.rs +++ b/crates/net/network/src/session/conn.rs @@ -1,10 +1,5 @@ //! Connection types for a session -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - use futures::{Sink, Stream}; use reth_ecies::stream::ECIESStream; use reth_eth_wire::{ @@ -13,6 +8,10 @@ use reth_eth_wire::{ multiplex::{ProtocolProxy, RlpxSatelliteStream}, EthMessage, EthNetworkPrimitives, EthStream, EthVersion, NetworkPrimitives, P2PStream, }; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; use tokio::net::TcpStream; /// The type of the underlying peer network connection. diff --git a/crates/net/network/src/session/counter.rs b/crates/net/network/src/session/counter.rs index 0d8f764f2..052cf1e25 100644 --- a/crates/net/network/src/session/counter.rs +++ b/crates/net/network/src/session/counter.rs @@ -1,8 +1,7 @@ +use super::ExceedsSessionLimit; use reth_network_api::Direction; use reth_network_types::SessionLimits; -use super::ExceedsSessionLimit; - /// Keeps track of all sessions. #[derive(Debug, Clone)] pub struct SessionCounter { diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs index f80428630..d167dc0e6 100644 --- a/crates/net/network/src/session/handle.rs +++ b/crates/net/network/src/session/handle.rs @@ -1,7 +1,10 @@ //! Session handles. -use std::{io, net::SocketAddr, sync::Arc, time::Instant}; - +use crate::{ + message::PeerMessage, + session::{conn::EthRlpxConnection, Direction, SessionId}, + PendingSessionHandshakeError, +}; use reth_ecies::ECIESError; use reth_eth_wire::{ capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason, @@ -10,17 +13,12 @@ use reth_eth_wire::{ use reth_network_api::PeerInfo; use reth_network_peers::{NodeRecord, PeerId}; use reth_network_types::PeerKind; +use std::{io, net::SocketAddr, sync::Arc, time::Instant}; use tokio::sync::{ mpsc::{self, error::SendError}, oneshot, }; -use crate::{ - message::PeerMessage, - session::{conn::EthRlpxConnection, Direction, SessionId}, - PendingSessionHandshakeError, -}; - /// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello /// message which exchanges the `capabilities` of the peer. /// diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index a95f0e889..816c540ce 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -23,6 +23,12 @@ use std::{ time::{Duration, Instant}, }; +use crate::{ + message::PeerMessage, + metrics::SessionManagerMetrics, + protocol::{IntoRlpxSubProtocol, RlpxSubProtocolHandlers, RlpxSubProtocols}, + session::active::ActiveSession, +}; use counter::SessionCounter; use futures::{future::Either, io, FutureExt, StreamExt}; use reth_ecies::{stream::ECIESStream, ECIESError}; @@ -31,11 +37,11 @@ use reth_eth_wire::{ Capabilities, DisconnectReason, EthVersion, HelloMessageWithProtocols, NetworkPrimitives, Status, UnauthedEthStream, UnauthedP2PStream, }; +use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head}; use reth_metrics::common::mpsc::MeteredPollSender; use reth_network_api::{PeerRequest, PeerRequestSender}; use reth_network_peers::PeerId; use reth_network_types::SessionsConfig; -use reth_primitives::{ForkFilter, ForkId, ForkTransition, Head}; use reth_tasks::TaskSpawner; use rustc_hash::FxHashMap; use secp256k1::SecretKey; @@ -48,13 +54,6 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::PollSender; use tracing::{debug, instrument, trace}; -use crate::{ - message::PeerMessage, - metrics::SessionManagerMetrics, - protocol::{IntoRlpxSubProtocol, RlpxSubProtocolHandlers, RlpxSubProtocols}, - session::active::ActiveSession, -}; - /// Internal identifier for active sessions. #[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)] pub struct SessionId(usize); diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index c51f115c5..473c76c26 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -1,5 +1,25 @@ //! Keeps track of the state of the network. +use crate::{ + cache::LruCache, + discovery::Discovery, + fetch::{BlockResponseOutcome, FetchAction, StateFetcher}, + message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult}, + peers::{PeerAction, PeersManager}, + FetchClient, +}; +use alloy_consensus::BlockHeader; +use alloy_primitives::B256; +use rand::seq::SliceRandom; +use reth_eth_wire::{ + BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives, + NewBlockHashes, Status, +}; +use reth_ethereum_forks::ForkId; +use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender}; +use reth_network_peers::PeerId; +use reth_network_types::{PeerAddr, PeerKind}; +use reth_primitives_traits::Block; use std::{ collections::{HashMap, VecDeque}, fmt, @@ -11,31 +31,9 @@ use std::{ }, task::{Context, Poll}, }; - -use alloy_consensus::BlockHeader; -use alloy_primitives::B256; -use rand::seq::SliceRandom; -use reth_eth_wire::{ - BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives, - NewBlockHashes, Status, -}; -use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender}; -use reth_network_peers::PeerId; -use reth_network_types::{PeerAddr, PeerKind}; -use reth_primitives::ForkId; -use reth_primitives_traits::Block; use tokio::sync::oneshot; use tracing::{debug, trace}; -use crate::{ - cache::LruCache, - discovery::Discovery, - fetch::{BlockResponseOutcome, FetchAction, StateFetcher}, - message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult}, - peers::{PeerAction, PeersManager}, - FetchClient, -}; - /// Cache limit of blocks to keep track of for a single peer. const PEER_BLOCK_CACHE_LIMIT: u32 = 512; diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 655934f20..47447783f 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -1,20 +1,3 @@ -use std::{ - io, - net::SocketAddr, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -use futures::Stream; -use reth_eth_wire::{ - capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason, - EthNetworkPrimitives, EthVersion, NetworkPrimitives, Status, -}; -use reth_network_api::{PeerRequest, PeerRequestSender}; -use reth_network_peers::PeerId; -use tracing::trace; - use crate::{ listener::{ConnectionListener, ListenerEvent}, message::PeerMessage, @@ -23,6 +6,21 @@ use crate::{ session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager}, state::{NetworkState, StateAction}, }; +use futures::Stream; +use reth_eth_wire::{ + capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason, + EthNetworkPrimitives, EthVersion, NetworkPrimitives, Status, +}; +use reth_network_api::{PeerRequest, PeerRequestSender}; +use reth_network_peers::PeerId; +use std::{ + io, + net::SocketAddr, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tracing::trace; #[cfg_attr(doc, aquamarine::aquamarine)] /// Contains the connectivity related state of the network. diff --git a/crates/net/network/src/test_utils/init.rs b/crates/net/network/src/test_utils/init.rs index 767f68180..87ccbb5f9 100644 --- a/crates/net/network/src/test_utils/init.rs +++ b/crates/net/network/src/test_utils/init.rs @@ -1,7 +1,6 @@ -use std::{net::SocketAddr, time::Duration}; - use enr::{k256::ecdsa::SigningKey, Enr, EnrPublicKey}; use reth_network_peers::PeerId; +use std::{net::SocketAddr, time::Duration}; /// The timeout for tests that create a `GethInstance` pub const GETH_TIMEOUT: Duration = Duration::from_secs(60); diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index d92272a87..a64084f2c 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -1,13 +1,13 @@ //! A network implementation for testing purposes. -use std::{ - fmt, - future::Future, - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, - pin::Pin, - task::{Context, Poll}, +use crate::{ + builder::ETH_REQUEST_CHANNEL_CAPACITY, + error::NetworkError, + eth_requests::EthRequestHandler, + protocol::IntoRlpxSubProtocol, + transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig}, + NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager, }; - use futures::{FutureExt, StreamExt}; use pin_project::pin_project; use reth_chainspec::{Hardforks, MAINNET}; @@ -27,6 +27,13 @@ use reth_transaction_pool::{ EthTransactionPool, TransactionPool, TransactionValidationTaskExecutor, }; use secp256k1::SecretKey; +use std::{ + fmt, + future::Future, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + pin::Pin, + task::{Context, Poll}, +}; use tokio::{ sync::{ mpsc::{channel, unbounded_channel}, @@ -35,15 +42,6 @@ use tokio::{ task::JoinHandle, }; -use crate::{ - builder::ETH_REQUEST_CHANNEL_CAPACITY, - error::NetworkError, - eth_requests::EthRequestHandler, - protocol::IntoRlpxSubProtocol, - transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig}, - NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager, -}; - /// A test network consisting of multiple peers. pub struct Testnet { /// All running peers in the network. diff --git a/crates/net/network/src/transactions/config.rs b/crates/net/network/src/transactions/config.rs index b838f7cfe..db59ffac5 100644 --- a/crates/net/network/src/transactions/config.rs +++ b/crates/net/network/src/transactions/config.rs @@ -1,5 +1,3 @@ -use derive_more::Constructor; - use super::{ DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER, DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, @@ -9,6 +7,7 @@ use crate::transactions::constants::tx_fetcher::{ DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER, }; +use derive_more::Constructor; /// Configuration for managing transactions within the network. #[derive(Debug, Clone)] diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 4c4119c85..0833f6774 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -25,13 +25,18 @@ //! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large //! enough to buffer many hashes during network failure, to allow for recovery. -use std::{ - collections::HashMap, - pin::Pin, - task::{ready, Context, Poll}, - time::Duration, +use super::{ + config::TransactionFetcherConfig, + constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST}, + MessageFilter, PeerMetadata, PooledTransactions, + SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, +}; +use crate::{ + cache::{LruCache, LruMap}, + duration_metered_exec, + metrics::TransactionFetcherMetrics, + transactions::{validation, PartiallyFilterMessage}, }; - use alloy_primitives::TxHash; use derive_more::{Constructor, Deref}; use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; @@ -47,23 +52,16 @@ use reth_primitives::PooledTransactionsElement; use schnellru::ByLength; #[cfg(debug_assertions)] use smallvec::{smallvec, SmallVec}; +use std::{ + collections::HashMap, + pin::Pin, + task::{ready, Context, Poll}, + time::Duration, +}; use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError}; use tracing::{debug, trace}; use validation::FilterOutcome; -use super::{ - config::TransactionFetcherConfig, - constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST}, - MessageFilter, PeerMetadata, PooledTransactions, - SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, -}; -use crate::{ - cache::{LruCache, LruMap}, - duration_metered_exec, - metrics::TransactionFetcherMetrics, - transactions::{validation, PartiallyFilterMessage}, -}; - /// The type responsible for fetching missing transactions from peers. /// /// This will keep track of unique transaction hashes that are currently being fetched and submits diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 241f01ae8..b499f0ac4 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -18,20 +18,19 @@ pub use validation::*; pub(crate) use fetcher::{FetchEvent, TransactionFetcher}; use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE}; -use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE; - -use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, - pin::Pin, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, +use crate::{ + budget::{ + DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS, + DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS, + DEFAULT_BUDGET_TRY_DRAIN_STREAM, }, - task::{Context, Poll}, - time::{Duration, Instant}, + cache::LruCache, + duration_metered_exec, metered_poll_nested_stream_with_budget, + metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, + NetworkHandle, }; - use alloy_primitives::{TxHash, B256}; +use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE; use futures::{stream::FuturesUnordered, Future, StreamExt}; use reth_eth_wire::{ DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData, @@ -56,22 +55,20 @@ use reth_transaction_pool::{ GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions, TransactionPool, ValidPoolTransaction, }; +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + task::{Context, Poll}, + time::{Duration, Instant}, +}; use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; use tracing::{debug, trace}; -use crate::{ - budget::{ - DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS, - DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS, - DEFAULT_BUDGET_TRY_DRAIN_STREAM, - }, - cache::LruCache, - duration_metered_exec, metered_poll_nested_stream_with_budget, - metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, - NetworkHandle, -}; - /// The future for importing transactions into the pool. /// /// Resolves with the result of each transaction import. diff --git a/crates/net/network/src/transactions/validation.rs b/crates/net/network/src/transactions/validation.rs index 7bfe07761..1575d9f33 100644 --- a/crates/net/network/src/transactions/validation.rs +++ b/crates/net/network/src/transactions/validation.rs @@ -2,8 +2,6 @@ //! and [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68) //! announcements. Validation and filtering of announcements is network dependent. -use std::{fmt, fmt::Display, mem}; - use crate::metrics::{AnnouncedTxTypesMetrics, TxTypesCounter}; use alloy_primitives::{Signature, TxHash}; use derive_more::{Deref, DerefMut}; @@ -12,6 +10,7 @@ use reth_eth_wire::{ MAX_MESSAGE_SIZE, }; use reth_primitives::TxType; +use std::{fmt, fmt::Display, mem}; use tracing::trace; /// The size of a decoded signature in bytes.