diff --git a/Cargo.lock b/Cargo.lock index 30ccba214..ec6310c0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7566,12 +7566,15 @@ dependencies = [ "enr", "futures", "reth-eth-wire-types", + "reth-ethereum-forks", "reth-network-p2p", "reth-network-peers", "reth-network-types", + "reth-tokio-util", "serde", "thiserror", "tokio", + "tokio-stream", ] [[package]] @@ -7650,7 +7653,7 @@ dependencies = [ "reth-db-api", "reth-engine-primitives", "reth-evm", - "reth-network", + "reth-network-api", "reth-payload-builder", "reth-payload-primitives", "reth-provider", @@ -7684,6 +7687,7 @@ dependencies = [ "reth-evm", "reth-exex", "reth-network", + "reth-network-api", "reth-network-p2p", "reth-node-api", "reth-node-core", diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index fe8ed21f1..3d451ad6e 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -17,7 +17,7 @@ use reth_downloaders::{ headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; use reth_exex::ExExManagerHandle; -use reth_network::{BlockDownloaderProvider, NetworkEvents, NetworkHandle}; +use reth_network::{BlockDownloaderProvider, NetworkEventListenerProvider, NetworkHandle}; use reth_network_api::NetworkInfo; use reth_network_p2p::{headers::client::HeadersClient, BlockClient}; use reth_primitives::{BlockHashOrNumber, BlockNumber, B256}; diff --git a/crates/e2e-test-utils/src/network.rs b/crates/e2e-test-utils/src/network.rs index 0514c68e1..e04815700 100644 --- a/crates/e2e-test-utils/src/network.rs +++ b/crates/e2e-test-utils/src/network.rs @@ -1,6 +1,6 @@ use futures_util::StreamExt; use reth::{ - network::{NetworkEvent, NetworkEvents, PeersHandleProvider, PeersInfo}, + network::{NetworkEvent, NetworkEventListenerProvider, PeersHandleProvider, PeersInfo}, rpc::types::PeerId, }; use reth_network_peers::NodeRecord; @@ -16,7 +16,7 @@ pub struct NetworkTestContext { impl NetworkTestContext where - Network: NetworkEvents + PeersInfo + PeersHandleProvider, + Network: NetworkEventListenerProvider + PeersInfo + PeersHandleProvider, { /// Creates a new network helper pub fn new(network: Network) -> Self { diff --git a/crates/ethereum/node/src/launch.rs b/crates/ethereum/node/src/launch.rs index d2278699e..444d16056 100644 --- a/crates/ethereum/node/src/launch.rs +++ b/crates/ethereum/node/src/launch.rs @@ -10,7 +10,9 @@ use reth_engine_tree::tree::TreeConfig; use reth_ethereum_engine::service::{ChainEvent, EthService}; use reth_ethereum_engine_primitives::EthEngineTypes; use reth_exex::ExExManagerHandle; -use reth_network::{BlockDownloaderProvider, NetworkEvents, NetworkSyncUpdater, SyncState}; +use reth_network::{ + BlockDownloaderProvider, NetworkEventListenerProvider, NetworkSyncUpdater, SyncState, +}; use reth_node_api::{FullNodeTypes, NodeAddOns}; use reth_node_builder::{ hooks::NodeHooks, diff --git a/crates/net/network-api/Cargo.toml b/crates/net/network-api/Cargo.toml index b516d467a..650d74904 100644 --- a/crates/net/network-api/Cargo.toml +++ b/crates/net/network-api/Cargo.toml @@ -13,11 +13,13 @@ workspace = true [dependencies] # reth -reth-eth-wire-types.workspace = true alloy-rpc-types-admin.workspace = true reth-network-peers.workspace = true reth-network-types.workspace = true reth-network-p2p.workspace = true +reth-eth-wire-types.workspace = true +reth-tokio-util.workspace = true +reth-ethereum-forks.workspace = true # ethereum alloy-primitives.workspace = true @@ -27,6 +29,7 @@ enr = { workspace = true, default-features = false, features = ["rust-secp256k1" # async futures.workspace = true +tokio-stream.workspace = true # misc thiserror.workspace = true diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs new file mode 100644 index 000000000..d2bd66d1f --- /dev/null +++ b/crates/net/network-api/src/events.rs @@ -0,0 +1,233 @@ +//! API related to listening for network events. + +use std::{fmt, net::SocketAddr, sync::Arc}; + +use reth_eth_wire_types::{ + message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage, + EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, + NodeData, PooledTransactions, Receipts, Status, +}; +use reth_ethereum_forks::ForkId; +use reth_network_p2p::error::{RequestError, RequestResult}; +use reth_network_peers::PeerId; +use reth_network_types::PeerAddr; +use reth_tokio_util::EventStream; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// Provides event subscription for the network. +#[auto_impl::auto_impl(&, Arc)] +pub trait NetworkEventListenerProvider: Send + Sync { + /// Creates a new [`NetworkEvent`] listener channel. + fn event_listener(&self) -> EventStream; + /// Returns a new [`DiscoveryEvent`] stream. + /// + /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. + fn discovery_listener(&self) -> UnboundedReceiverStream; +} + +/// (Non-exhaustive) Events emitted by the network that are of interest for subscribers. +/// +/// This includes any event types that may be relevant to tasks, for metrics, keep track of peers +/// etc. +#[derive(Debug, Clone)] +pub enum NetworkEvent { + /// Closed the peer session. + SessionClosed { + /// The identifier of the peer to which a session was closed. + peer_id: PeerId, + /// Why the disconnect was triggered + reason: Option, + }, + /// Established a new session with the given peer. + SessionEstablished { + /// The identifier of the peer to which a session was established. + peer_id: PeerId, + /// The remote addr of the peer to which a session was established. + remote_addr: SocketAddr, + /// The client version of the peer to which a session was established. + client_version: Arc, + /// Capabilities the peer announced + capabilities: Arc, + /// A request channel to the session task. + messages: PeerRequestSender, + /// The status of the peer to which a session was established. + status: Arc, + /// negotiated eth version of the session + version: EthVersion, + }, + /// Event emitted when a new peer is added + PeerAdded(PeerId), + /// Event emitted when a new peer is removed + PeerRemoved(PeerId), +} + +/// Events produced by the `Discovery` manager. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum DiscoveryEvent { + /// Discovered a node + NewNode(DiscoveredEvent), + /// Retrieved a [`ForkId`] from the peer via ENR request, See + EnrForkId(PeerId, ForkId), +} + +/// Represents events related to peer discovery in the network. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum DiscoveredEvent { + /// Indicates that a new peer has been discovered and queued for potential connection. + /// + /// This event is generated when the system becomes aware of a new peer + /// but hasn't yet established a connection. + /// + /// # Fields + /// + /// * `peer_id` - The unique identifier of the discovered peer. + /// * `addr` - The network address of the discovered peer. + /// * `fork_id` - An optional identifier for the fork that this peer is associated with. `None` + /// if the peer is not associated with a specific fork. + EventQueued { + /// The unique identifier of the discovered peer. + peer_id: PeerId, + /// The network address of the discovered peer. + addr: PeerAddr, + /// An optional identifier for the fork that this peer is associated with. + /// `None` if the peer is not associated with a specific fork. + fork_id: Option, + }, +} + +/// Protocol related request messages that expect a response +#[derive(Debug)] +pub enum PeerRequest { + /// Requests block headers from the peer. + /// + /// The response should be sent through the channel. + GetBlockHeaders { + /// The request for block headers. + request: GetBlockHeaders, + /// The channel to send the response for block headers. + response: oneshot::Sender>, + }, + /// Requests block bodies from the peer. + /// + /// The response should be sent through the channel. + GetBlockBodies { + /// The request for block bodies. + request: GetBlockBodies, + /// The channel to send the response for block bodies. + response: oneshot::Sender>, + }, + /// Requests pooled transactions from the peer. + /// + /// The response should be sent through the channel. + GetPooledTransactions { + /// The request for pooled transactions. + request: GetPooledTransactions, + /// The channel to send the response for pooled transactions. + response: oneshot::Sender>, + }, + /// Requests `NodeData` from the peer. + /// + /// The response should be sent through the channel. + GetNodeData { + /// The request for `NodeData`. + request: GetNodeData, + /// The channel to send the response for `NodeData`. + response: oneshot::Sender>, + }, + /// Requests receipts from the peer. + /// + /// The response should be sent through the channel. + GetReceipts { + /// The request for receipts. + request: GetReceipts, + /// The channel to send the response for receipts. + response: oneshot::Sender>, + }, +} + +// === impl PeerRequest === + +impl PeerRequest { + /// Invoked if we received a response which does not match the request + pub fn send_bad_response(self) { + self.send_err_response(RequestError::BadResponse) + } + + /// Send an error back to the receiver. + pub fn send_err_response(self, err: RequestError) { + let _ = match self { + Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(), + Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(), + Self::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(), + Self::GetNodeData { response, .. } => response.send(Err(err)).ok(), + Self::GetReceipts { response, .. } => response.send(Err(err)).ok(), + }; + } + + /// Returns the [`EthMessage`] for this type + pub fn create_request_message(&self, request_id: u64) -> EthMessage { + match self { + Self::GetBlockHeaders { request, .. } => { + EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request }) + } + Self::GetBlockBodies { request, .. } => { + EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() }) + } + Self::GetPooledTransactions { request, .. } => { + EthMessage::GetPooledTransactions(RequestPair { + request_id, + message: request.clone(), + }) + } + Self::GetNodeData { request, .. } => { + EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() }) + } + Self::GetReceipts { request, .. } => { + EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() }) + } + } + } + + /// Consumes the type and returns the inner [`GetPooledTransactions`] variant. + pub fn into_get_pooled_transactions(self) -> Option { + match self { + Self::GetPooledTransactions { request, .. } => Some(request), + _ => None, + } + } +} + +/// A Cloneable connection for sending _requests_ directly to the session of a peer. +#[derive(Clone)] +pub struct PeerRequestSender { + /// id of the remote node. + pub peer_id: PeerId, + /// The Sender half connected to a session. + pub to_session_tx: mpsc::Sender, +} + +// === impl PeerRequestSender === + +impl PeerRequestSender { + /// Constructs a new sender instance that's wired to a session + pub const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender) -> Self { + Self { peer_id, to_session_tx } + } + + /// Attempts to immediately send a message on this Sender + pub fn try_send(&self, req: PeerRequest) -> Result<(), mpsc::error::TrySendError> { + self.to_session_tx.try_send(req) + } + + /// Returns the peer id of the remote peer. + pub const fn peer_id(&self) -> &PeerId { + &self.peer_id + } +} + +impl fmt::Debug for PeerRequestSender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive() + } +} diff --git a/crates/net/network-api/src/lib.rs b/crates/net/network-api/src/lib.rs index f549ca8b8..adb4d124d 100644 --- a/crates/net/network-api/src/lib.rs +++ b/crates/net/network-api/src/lib.rs @@ -16,16 +16,22 @@ pub mod downloaders; /// Network Error pub mod error; +pub mod events; /// Implementation of network traits for that does nothing. pub mod noop; pub mod test_utils; pub use alloy_rpc_types_admin::EthProtocolInfo; +use reth_network_p2p::sync::NetworkSyncUpdater; pub use reth_network_p2p::BlockClient; pub use reth_network_types::{PeerKind, Reputation, ReputationChangeKind}; pub use downloaders::BlockDownloaderProvider; pub use error::NetworkError; +pub use events::{ + DiscoveredEvent, DiscoveryEvent, NetworkEvent, NetworkEventListenerProvider, PeerRequest, + PeerRequestSender, +}; use std::{future::Future, net::SocketAddr, sync::Arc, time::Instant}; @@ -35,6 +41,31 @@ use reth_network_peers::NodeRecord; /// The `PeerId` type. pub type PeerId = alloy_primitives::B512; +/// Helper trait that unifies network API needed to launch node. +pub trait FullNetwork: + BlockDownloaderProvider + + NetworkSyncUpdater + + NetworkInfo + + NetworkEventListenerProvider + + PeersInfo + + Peers + + Clone + + 'static +{ +} + +impl FullNetwork for T where + T: BlockDownloaderProvider + + NetworkSyncUpdater + + NetworkInfo + + NetworkEventListenerProvider + + PeersInfo + + Peers + + Clone + + 'static +{ +} + /// Provides general purpose information about the network. #[auto_impl::auto_impl(&, Arc)] pub trait NetworkInfo: Send + Sync { diff --git a/crates/net/network/benches/bench.rs b/crates/net/network/benches/bench.rs index c41eb6202..7f6b37177 100644 --- a/crates/net/network/benches/bench.rs +++ b/crates/net/network/benches/bench.rs @@ -3,7 +3,7 @@ use criterion::*; use futures::StreamExt; use pprof::criterion::{Output, PProfProfiler}; use rand::thread_rng; -use reth_network::{test_utils::Testnet, NetworkEvents}; +use reth_network::{test_utils::Testnet, NetworkEventListenerProvider}; use reth_network_api::Peers; use reth_primitives::U256; use reth_provider::test_utils::{ExtendedAccount, MockEthProvider}; diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index 366601c63..feabeed87 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -15,6 +15,7 @@ use reth_discv5::{DiscoveredPeer, Discv5}; use reth_dns_discovery::{ DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver, }; +use reth_network_api::{DiscoveredEvent, DiscoveryEvent}; use reth_network_peers::{NodeRecord, PeerId}; use reth_network_types::PeerAddr; use reth_primitives::{EnrForkIdEntry, ForkId}; @@ -26,7 +27,6 @@ use tracing::trace; use crate::{ cache::LruMap, error::{NetworkError, ServiceKind}, - manager::DiscoveredEvent, }; /// Default max capacity for cache of discovered peers. @@ -328,15 +328,6 @@ impl Discovery { } } -/// Events produced by the [`Discovery`] manager. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum DiscoveryEvent { - /// Discovered a node - NewNode(DiscoveredEvent), - /// Retrieved a [`ForkId`] from the peer via ENR request, See - EnrForkId(PeerId, ForkId), -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index 786356329..0e433a388 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -138,7 +138,10 @@ mod state; mod swarm; pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols}; -pub use reth_network_api::{BlockDownloaderProvider, NetworkInfo, Peers, PeersInfo}; +pub use reth_network_api::{ + BlockDownloaderProvider, DiscoveredEvent, DiscoveryEvent, NetworkEvent, + NetworkEventListenerProvider, NetworkInfo, PeerRequest, PeerRequestSender, Peers, PeersInfo, +}; pub use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState}; pub use reth_network_types::{PeersConfig, SessionsConfig}; pub use session::{ @@ -149,12 +152,11 @@ pub use session::{ pub use builder::NetworkBuilder; pub use config::{NetworkConfig, NetworkConfigBuilder}; -pub use discovery::{Discovery, DiscoveryEvent}; +pub use discovery::Discovery; pub use fetch::FetchClient; pub use flattened_response::FlattenedResponse; -pub use manager::{DiscoveredEvent, NetworkEvent, NetworkManager}; -pub use message::PeerRequest; +pub use manager::NetworkManager; pub use metrics::TxTypesCounter; -pub use network::{FullNetwork, NetworkEvents, NetworkHandle, NetworkProtocols}; +pub use network::{NetworkHandle, NetworkProtocols}; pub use swarm::NetworkConnectionState; pub use transactions::{FilterAnnouncement, MessageFilter, ValidateTx68}; diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 04d8efc18..023004a66 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -29,15 +29,14 @@ use std::{ use futures::{Future, StreamExt}; use parking_lot::Mutex; -use reth_eth_wire::{ - capability::CapabilityMessage, Capabilities, DisconnectReason, EthVersion, Status, -}; +use reth_eth_wire::{capability::CapabilityMessage, Capabilities, DisconnectReason}; use reth_fs_util::{self as fs, FsPathError}; use reth_metrics::common::mpsc::UnboundedMeteredSender; -use reth_network_api::{test_utils::PeersHandle, EthProtocolInfo, NetworkStatus, PeerInfo}; +use reth_network_api::{ + test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest, +}; use reth_network_peers::{NodeRecord, PeerId}; -use reth_network_types::{PeerAddr, ReputationChangeKind}; -use reth_primitives::ForkId; +use reth_network_types::ReputationChangeKind; use reth_storage_api::BlockNumReader; use reth_tasks::shutdown::GracefulShutdown; use reth_tokio_util::EventSender; @@ -54,7 +53,7 @@ use crate::{ eth_requests::IncomingEthRequest, import::{BlockImport, BlockImportOutcome, BlockValidation}, listener::ConnectionListener, - message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender}, + message::{NewBlockMessage, PeerMessage}, metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, network::{NetworkHandle, NetworkHandleMessage}, peers::PeersManager, @@ -1044,67 +1043,6 @@ impl Future for NetworkManager { } } -/// (Non-exhaustive) Events emitted by the network that are of interest for subscribers. -/// -/// This includes any event types that may be relevant to tasks, for metrics, keep track of peers -/// etc. -#[derive(Debug, Clone)] -pub enum NetworkEvent { - /// Closed the peer session. - SessionClosed { - /// The identifier of the peer to which a session was closed. - peer_id: PeerId, - /// Why the disconnect was triggered - reason: Option, - }, - /// Established a new session with the given peer. - SessionEstablished { - /// The identifier of the peer to which a session was established. - peer_id: PeerId, - /// The remote addr of the peer to which a session was established. - remote_addr: SocketAddr, - /// The client version of the peer to which a session was established. - client_version: Arc, - /// Capabilities the peer announced - capabilities: Arc, - /// A request channel to the session task. - messages: PeerRequestSender, - /// The status of the peer to which a session was established. - status: Arc, - /// negotiated eth version of the session - version: EthVersion, - }, - /// Event emitted when a new peer is added - PeerAdded(PeerId), - /// Event emitted when a new peer is removed - PeerRemoved(PeerId), -} - -/// Represents events related to peer discovery in the network. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum DiscoveredEvent { - /// Indicates that a new peer has been discovered and queued for potential connection. - /// - /// This event is generated when the system becomes aware of a new peer - /// but hasn't yet established a connection. - /// - /// # Fields - /// - /// * `peer_id` - The unique identifier of the discovered peer. - /// * `addr` - The network address of the discovered peer. - /// * `fork_id` - An optional identifier for the fork that this peer is associated with. `None` - /// if the peer is not associated with a specific fork. - EventQueued { - /// The unique identifier of the discovered peer. - peer_id: PeerId, - /// The network address of the discovered peer. - addr: PeerAddr, - /// An optional identifier for the fork that this peer is associated with. - /// `None` if the peer is not associated with a specific fork. - fork_id: Option, - }, -} - #[derive(Debug, Default)] struct NetworkManagerPollDurations { acc_network_handle: Duration, diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 296f634f0..10f9ddcba 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -4,7 +4,6 @@ //! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, use std::{ - fmt, sync::Arc, task::{ready, Context, Poll}, }; @@ -12,16 +11,15 @@ use std::{ use futures::FutureExt; use reth_eth_wire::{ capability::RawCapabilityMessage, message::RequestPair, BlockBodies, BlockHeaders, EthMessage, - GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, - NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, - SharedTransactions, Transactions, + GetBlockBodies, GetBlockHeaders, NewBlock, NewBlockHashes, NewPooledTransactionHashes, + NodeData, PooledTransactions, Receipts, SharedTransactions, Transactions, }; +use reth_network_api::PeerRequest; use reth_network_p2p::error::{RequestError, RequestResult}; -use reth_network_peers::PeerId; use reth_primitives::{ BlockBody, Bytes, Header, PooledTransactionsElement, ReceiptWithBloom, B256, }; -use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot}; +use tokio::sync::oneshot; /// Internal form of a `NewBlock` message #[derive(Debug, Clone)] @@ -75,108 +73,6 @@ pub enum BlockRequest { GetBlockBodies(GetBlockBodies), } -/// Protocol related request messages that expect a response -#[derive(Debug)] -pub enum PeerRequest { - /// Requests block headers from the peer. - /// - /// The response should be sent through the channel. - GetBlockHeaders { - /// The request for block headers. - request: GetBlockHeaders, - /// The channel to send the response for block headers. - response: oneshot::Sender>, - }, - /// Requests block bodies from the peer. - /// - /// The response should be sent through the channel. - GetBlockBodies { - /// The request for block bodies. - request: GetBlockBodies, - /// The channel to send the response for block bodies. - response: oneshot::Sender>, - }, - /// Requests pooled transactions from the peer. - /// - /// The response should be sent through the channel. - GetPooledTransactions { - /// The request for pooled transactions. - request: GetPooledTransactions, - /// The channel to send the response for pooled transactions. - response: oneshot::Sender>, - }, - /// Requests `NodeData` from the peer. - /// - /// The response should be sent through the channel. - GetNodeData { - /// The request for `NodeData`. - request: GetNodeData, - /// The channel to send the response for `NodeData`. - response: oneshot::Sender>, - }, - /// Requests receipts from the peer. - /// - /// The response should be sent through the channel. - GetReceipts { - /// The request for receipts. - request: GetReceipts, - /// The channel to send the response for receipts. - response: oneshot::Sender>, - }, -} - -// === impl PeerRequest === - -impl PeerRequest { - /// Invoked if we received a response which does not match the request - pub(crate) fn send_bad_response(self) { - self.send_err_response(RequestError::BadResponse) - } - - /// Send an error back to the receiver. - pub(crate) fn send_err_response(self, err: RequestError) { - let _ = match self { - Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(), - Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(), - Self::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(), - Self::GetNodeData { response, .. } => response.send(Err(err)).ok(), - Self::GetReceipts { response, .. } => response.send(Err(err)).ok(), - }; - } - - /// Returns the [`EthMessage`] for this type - pub fn create_request_message(&self, request_id: u64) -> EthMessage { - match self { - Self::GetBlockHeaders { request, .. } => { - EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request }) - } - Self::GetBlockBodies { request, .. } => { - EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() }) - } - Self::GetPooledTransactions { request, .. } => { - EthMessage::GetPooledTransactions(RequestPair { - request_id, - message: request.clone(), - }) - } - Self::GetNodeData { request, .. } => { - EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() }) - } - Self::GetReceipts { request, .. } => { - EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() }) - } - } - } - - /// Consumes the type and returns the inner [`GetPooledTransactions`] variant. - pub fn into_get_pooled_transactions(self) -> Option { - match self { - Self::GetPooledTransactions { request, .. } => Some(request), - _ => None, - } - } -} - /// Corresponding variant for [`PeerRequest`]. #[derive(Debug)] pub enum PeerResponse { @@ -308,37 +204,3 @@ impl PeerResponseResult { self.err().is_some() } } - -/// A Cloneable connection for sending _requests_ directly to the session of a peer. -#[derive(Clone)] -pub struct PeerRequestSender { - /// id of the remote node. - pub(crate) peer_id: PeerId, - /// The Sender half connected to a session. - pub(crate) to_session_tx: mpsc::Sender, -} - -// === impl PeerRequestSender === - -impl PeerRequestSender { - /// Constructs a new sender instance that's wired to a session - pub(crate) const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender) -> Self { - Self { peer_id, to_session_tx } - } - - /// Attempts to immediately send a message on this Sender - pub fn try_send(&self, req: PeerRequest) -> Result<(), TrySendError> { - self.to_session_tx.try_send(req) - } - - /// Returns the peer id of the remote peer. - pub const fn peer_id(&self) -> &PeerId { - &self.peer_id - } -} - -impl fmt::Debug for PeerRequestSender { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive() - } -} diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index cb54b7b7b..7ff8d2e97 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -12,7 +12,9 @@ use reth_discv4::Discv4; use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions}; use reth_network_api::{ test_utils::{PeersHandle, PeersHandleProvider}, - BlockDownloaderProvider, NetworkError, NetworkInfo, NetworkStatus, PeerInfo, Peers, PeersInfo, + BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent, + NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers, + PeersInfo, }; use reth_network_p2p::{ sync::{NetworkSyncUpdater, SyncState, SyncStateProvider}, @@ -30,36 +32,10 @@ use tokio::sync::{ use tokio_stream::wrappers::UnboundedReceiverStream; use crate::{ - config::NetworkMode, discovery::DiscoveryEvent, manager::NetworkEvent, message::PeerRequest, - protocol::RlpxSubProtocol, swarm::NetworkConnectionState, transactions::TransactionsHandle, - FetchClient, + config::NetworkMode, protocol::RlpxSubProtocol, swarm::NetworkConnectionState, + transactions::TransactionsHandle, FetchClient, }; -/// Helper trait that unifies network API needed to launch node. -pub trait FullNetwork: - BlockDownloaderProvider - + NetworkSyncUpdater - + NetworkInfo - + NetworkEvents - + PeersInfo - + Peers - + Clone - + 'static -{ -} - -impl FullNetwork for T where - T: BlockDownloaderProvider - + NetworkSyncUpdater - + NetworkInfo - + NetworkEvents - + PeersInfo - + Peers - + Clone - + 'static -{ -} - /// A _shareable_ network frontend. Used to interact with the network. /// /// See also [`NetworkManager`](crate::NetworkManager). @@ -207,7 +183,7 @@ impl NetworkHandle { // === API Implementations === -impl NetworkEvents for NetworkHandle { +impl NetworkEventListenerProvider for NetworkHandle { fn event_listener(&self) -> EventStream { self.inner.event_sender.new_listener() } @@ -434,17 +410,6 @@ struct NetworkInner { event_sender: EventSender, } -/// Provides event subscription for the network. -#[auto_impl::auto_impl(&, Arc)] -pub trait NetworkEvents: Send + Sync { - /// Creates a new [`NetworkEvent`] listener channel. - fn event_listener(&self) -> EventStream; - /// Returns a new [`DiscoveryEvent`] stream. - /// - /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. - fn discovery_listener(&self) -> UnboundedReceiverStream; -} - /// Provides access to modify the network's additional protocol handlers. pub trait NetworkProtocols: Send + Sync { /// Adds an additional protocol handler to the `RLPx` sub-protocol list. diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 873825f25..f8aa4fcae 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -18,6 +18,7 @@ use reth_eth_wire::{ Capabilities, DisconnectP2P, DisconnectReason, EthMessage, }; use reth_metrics::common::mpsc::MeteredPollSender; +use reth_network_api::PeerRequest; use reth_network_p2p::error::RequestError; use reth_network_peers::PeerId; use reth_network_types::session::config::INITIAL_REQUEST_TIMEOUT; @@ -31,7 +32,7 @@ use tokio_util::sync::PollSender; use tracing::{debug, trace}; use crate::{ - message::{NewBlockMessage, PeerMessage, PeerRequest, PeerResponse, PeerResponseResult}, + message::{NewBlockMessage, PeerMessage, PeerResponse, PeerResponseResult}, session::{ conn::EthRlpxConnection, handle::{ActiveSessionMessage, SessionCommand}, diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index ab07f3875..7209e7414 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -11,8 +11,6 @@ pub use handle::{ SessionCommand, }; -pub use crate::message::PeerRequestSender; - pub use reth_network_api::{Direction, PeerInfo}; use std::{ @@ -33,6 +31,7 @@ use reth_eth_wire::{ UnauthedEthStream, UnauthedP2PStream, }; use reth_metrics::common::mpsc::MeteredPollSender; +use reth_network_api::PeerRequestSender; use reth_network_peers::PeerId; use reth_network_types::SessionsConfig; use reth_primitives::{ForkFilter, ForkId, ForkTransition, Head}; diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 86b982936..bc151b698 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -14,6 +14,7 @@ use std::{ use rand::seq::SliceRandom; use reth_eth_wire::{BlockHashNumber, Capabilities, DisconnectReason, NewBlockHashes, Status}; +use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender}; use reth_network_peers::PeerId; use reth_network_types::{PeerAddr, PeerKind}; use reth_primitives::{ForkId, B256}; @@ -22,13 +23,9 @@ use tracing::{debug, trace}; use crate::{ cache::LruCache, - discovery::{Discovery, DiscoveryEvent}, + discovery::Discovery, fetch::{BlockResponseOutcome, FetchAction, StateFetcher}, - manager::DiscoveredEvent, - message::{ - BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse, - PeerResponseResult, - }, + message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult}, peers::{PeerAction, PeersManager}, FetchClient, }; @@ -543,26 +540,28 @@ pub(crate) enum StateAction { #[cfg(test)] mod tests { - use crate::{ - discovery::Discovery, - fetch::StateFetcher, - message::PeerRequestSender, - peers::PeersManager, - state::{BlockNumReader, NetworkState}, - PeerRequest, - }; - use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthVersion}; - use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError}; - use reth_network_peers::PeerId; - use reth_primitives::{BlockBody, Header, B256}; - use reth_provider::test_utils::NoopProvider; use std::{ future::poll_fn, sync::{atomic::AtomicU64, Arc}, }; + + use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthVersion}; + use reth_network_api::PeerRequestSender; + use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError}; + use reth_network_peers::PeerId; + use reth_primitives::{BlockBody, Header, B256}; + use reth_provider::test_utils::NoopProvider; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; + use crate::{ + discovery::Discovery, + fetch::StateFetcher, + peers::PeersManager, + state::{BlockNumReader, NetworkState}, + PeerRequest, + }; + /// Returns a testing instance of the [`NetworkState`]. fn state() -> NetworkState { let peers = PeersManager::default(); diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 4a2d1450f..0be7ae1c1 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -10,12 +10,13 @@ use futures::Stream; use reth_eth_wire::{ capability::CapabilityMessage, errors::EthStreamError, Capabilities, EthVersion, Status, }; +use reth_network_api::PeerRequestSender; use reth_network_peers::PeerId; use tracing::trace; use crate::{ listener::{ConnectionListener, ListenerEvent}, - message::{PeerMessage, PeerRequestSender}, + message::PeerMessage, peers::InboundConnectionError, protocol::IntoRlpxSubProtocol, session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager}, diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index aac4b67fc..913fe55ae 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -14,7 +14,7 @@ use reth_chainspec::MAINNET; use reth_eth_wire::{protocol::Protocol, DisconnectReason, HelloMessageWithProtocols}; use reth_network_api::{ test_utils::{PeersHandle, PeersHandleProvider}, - NetworkInfo, Peers, + NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers, }; use reth_network_peers::PeerId; use reth_provider::test_utils::NoopProvider; @@ -41,8 +41,7 @@ use crate::{ eth_requests::EthRequestHandler, protocol::IntoRlpxSubProtocol, transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig}, - NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkHandle, - NetworkManager, + NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager, }; /// A test network consisting of multiple peers. diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index f5563d102..71db299ec 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -39,6 +39,7 @@ use reth_eth_wire::{ DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData, PartiallyValidData, RequestTxHashes, ValidAnnouncementData, }; +use reth_network_api::PeerRequest; use reth_network_p2p::error::{RequestError, RequestResult}; use reth_network_peers::PeerId; use reth_primitives::{PooledTransactionsElement, TxHash}; @@ -58,7 +59,6 @@ use super::{ use crate::{ cache::{LruCache, LruMap}, duration_metered_exec, - message::PeerRequest, metrics::TransactionFetcherMetrics, transactions::{validation, PartiallyFilterMessage}, }; diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 5a46804a4..ba6b0c854 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -38,7 +38,9 @@ use reth_eth_wire::{ PooledTransactions, RequestTxHashes, Transactions, }; use reth_metrics::common::mpsc::UnboundedMeteredReceiver; -use reth_network_api::Peers; +use reth_network_api::{ + NetworkEvent, NetworkEventListenerProvider, PeerRequest, PeerRequestSender, Peers, +}; use reth_network_p2p::{ error::{RequestError, RequestResult}, sync::SyncStateProvider, @@ -63,12 +65,9 @@ use crate::{ DEFAULT_BUDGET_TRY_DRAIN_STREAM, }, cache::LruCache, - duration_metered_exec, - manager::NetworkEvent, - message::{PeerRequest, PeerRequestSender}, - metered_poll_nested_stream_with_budget, + duration_metered_exec, metered_poll_nested_stream_with_budget, metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, - NetworkEvents, NetworkHandle, + NetworkHandle, }; /// The future for importing transactions into the pool. diff --git a/crates/net/network/tests/it/big_pooled_txs_req.rs b/crates/net/network/tests/it/big_pooled_txs_req.rs index 4ddc0cdd3..866d94b2d 100644 --- a/crates/net/network/tests/it/big_pooled_txs_req.rs +++ b/crates/net/network/tests/it/big_pooled_txs_req.rs @@ -1,7 +1,7 @@ use reth_eth_wire::{GetPooledTransactions, PooledTransactions}; use reth_network::{ test_utils::{NetworkEventStream, Testnet}, - NetworkEvents, PeerRequest, + NetworkEventListenerProvider, PeerRequest, }; use reth_network_api::{NetworkInfo, Peers}; use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState}; diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 9433d87c1..b791ed691 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -10,8 +10,8 @@ use reth_eth_wire::{DisconnectReason, HeadersDirection}; use reth_net_banlist::BanList; use reth_network::{ test_utils::{enr_to_peer_id, NetworkEventStream, PeerConfig, Testnet, GETH_TIMEOUT}, - BlockDownloaderProvider, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkManager, - PeersConfig, + BlockDownloaderProvider, NetworkConfigBuilder, NetworkEvent, NetworkEventListenerProvider, + NetworkManager, PeersConfig, }; use reth_network_api::{NetworkInfo, Peers, PeersInfo}; use reth_network_p2p::{ diff --git a/crates/net/network/tests/it/requests.rs b/crates/net/network/tests/it/requests.rs index cca48186c..d10a3c1f7 100644 --- a/crates/net/network/tests/it/requests.rs +++ b/crates/net/network/tests/it/requests.rs @@ -7,7 +7,7 @@ use rand::Rng; use reth_eth_wire::HeadersDirection; use reth_network::{ test_utils::{NetworkEventStream, Testnet}, - BlockDownloaderProvider, NetworkEvents, + BlockDownloaderProvider, NetworkEventListenerProvider, }; use reth_network_api::{NetworkInfo, Peers}; use reth_network_p2p::{ diff --git a/crates/net/network/tests/it/session.rs b/crates/net/network/tests/it/session.rs index 0442cd431..6bc029d8a 100644 --- a/crates/net/network/tests/it/session.rs +++ b/crates/net/network/tests/it/session.rs @@ -4,7 +4,7 @@ use futures::StreamExt; use reth_eth_wire::EthVersion; use reth_network::{ test_utils::{PeerConfig, Testnet}, - NetworkEvent, NetworkEvents, + NetworkEvent, NetworkEventListenerProvider, }; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::test_utils::NoopProvider; diff --git a/crates/net/network/tests/it/txgossip.rs b/crates/net/network/tests/it/txgossip.rs index d5c5ccf63..d38dea0b0 100644 --- a/crates/net/network/tests/it/txgossip.rs +++ b/crates/net/network/tests/it/txgossip.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use futures::StreamExt; use rand::thread_rng; -use reth_network::{test_utils::Testnet, NetworkEvent, NetworkEvents}; +use reth_network::{test_utils::Testnet, NetworkEvent, NetworkEventListenerProvider}; use reth_network_api::PeersInfo; use reth_primitives::{TransactionSigned, TxLegacy, U256}; use reth_provider::test_utils::{ExtendedAccount, MockEthProvider}; diff --git a/crates/node/api/Cargo.toml b/crates/node/api/Cargo.toml index 2ae8f52c9..46ff61396 100644 --- a/crates/node/api/Cargo.toml +++ b/crates/node/api/Cargo.toml @@ -17,7 +17,7 @@ reth-provider.workspace = true reth-db-api.workspace = true reth-engine-primitives.workspace = true reth-transaction-pool.workspace = true -reth-network.workspace = true reth-payload-builder.workspace = true reth-payload-primitives.workspace = true reth-tasks.workspace = true +reth-network-api.workspace = true diff --git a/crates/node/api/src/node.rs b/crates/node/api/src/node.rs index da4115ec9..8cd72e91e 100644 --- a/crates/node/api/src/node.rs +++ b/crates/node/api/src/node.rs @@ -7,7 +7,7 @@ use reth_db_api::{ database_metrics::{DatabaseMetadata, DatabaseMetrics}, }; use reth_evm::execute::BlockExecutorProvider; -use reth_network::FullNetwork; +use reth_network_api::FullNetwork; use reth_payload_builder::PayloadBuilderHandle; use reth_provider::FullProvider; use reth_tasks::TaskExecutor; diff --git a/crates/node/builder/Cargo.toml b/crates/node/builder/Cargo.toml index c10b15615..fe1ef0d0d 100644 --- a/crates/node/builder/Cargo.toml +++ b/crates/node/builder/Cargo.toml @@ -49,6 +49,7 @@ reth-rpc-types.workspace = true reth-engine-util.workspace = true reth-cli-util.workspace = true reth-rpc-eth-types.workspace = true +reth-network-api.workspace = true ## async futures.workspace = true diff --git a/crates/node/builder/src/components/mod.rs b/crates/node/builder/src/components/mod.rs index 6f0e994d6..e2d35c470 100644 --- a/crates/node/builder/src/components/mod.rs +++ b/crates/node/builder/src/components/mod.rs @@ -23,7 +23,8 @@ pub use pool::*; use reth_consensus::Consensus; use reth_evm::execute::BlockExecutorProvider; -use reth_network::{FullNetwork, NetworkHandle}; +use reth_network::NetworkHandle; +use reth_network_api::FullNetwork; use reth_payload_builder::PayloadBuilderHandle; use reth_transaction_pool::TransactionPool; diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index bd474b577..c23ba3e51 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -15,7 +15,7 @@ use reth_blockchain_tree::{noop::NoopBlockchainTree, BlockchainTreeConfig}; use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider}; use reth_engine_util::EngineMessageStreamExt; use reth_exex::ExExManagerHandle; -use reth_network::{BlockDownloaderProvider, NetworkEvents}; +use reth_network::{BlockDownloaderProvider, NetworkEventListenerProvider}; use reth_node_api::{FullNodeComponents, FullNodeTypes, NodeAddOns}; use reth_node_core::{ dirs::{ChainPath, DataDirPath}, diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index d1146021b..a2373df27 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -28,7 +28,7 @@ reth-tasks = { workspace = true, features = ["rayon"] } reth-consensus-common.workspace = true reth-rpc-types-compat.workspace = true revm-inspectors = { workspace = true, features = ["js-tracer"] } -reth-network-peers.workspace = true +reth-network-peers = { workspace = true, features = ["secp256k1"] } reth-evm.workspace = true reth-rpc-eth-types.workspace = true reth-rpc-server-types.workspace = true diff --git a/examples/bsc-p2p/src/main.rs b/examples/bsc-p2p/src/main.rs index 558eed171..7756728aa 100644 --- a/examples/bsc-p2p/src/main.rs +++ b/examples/bsc-p2p/src/main.rs @@ -14,7 +14,7 @@ use chainspec::{boot_nodes, bsc_chain_spec}; use reth_discv4::Discv4ConfigBuilder; -use reth_network::{NetworkConfig, NetworkEvent, NetworkEvents, NetworkManager}; +use reth_network::{NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager}; use reth_network_api::PeersInfo; use reth_primitives::{ForkHash, ForkId}; use reth_tracing::{ diff --git a/examples/network/src/main.rs b/examples/network/src/main.rs index 16482ca1f..1d8f436f3 100644 --- a/examples/network/src/main.rs +++ b/examples/network/src/main.rs @@ -7,7 +7,9 @@ //! ``` use futures::StreamExt; -use reth_network::{config::rng_secret_key, NetworkConfig, NetworkEvents, NetworkManager}; +use reth_network::{ + config::rng_secret_key, NetworkConfig, NetworkEventListenerProvider, NetworkManager, +}; use reth_provider::test_utils::NoopProvider; #[tokio::main] diff --git a/examples/polygon-p2p/src/main.rs b/examples/polygon-p2p/src/main.rs index acfa3d82b..d93c92ceb 100644 --- a/examples/polygon-p2p/src/main.rs +++ b/examples/polygon-p2p/src/main.rs @@ -12,7 +12,7 @@ use chain_cfg::{boot_nodes, head, polygon_chain_spec}; use reth_discv4::Discv4ConfigBuilder; use reth_network::{ - config::NetworkMode, NetworkConfig, NetworkEvent, NetworkEvents, NetworkManager, + config::NetworkMode, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager, }; use reth_provider::test_utils::NoopProvider; use reth_tracing::{