chore(net): Move reth_network::NetworkEvents into reth-network-api (#10062)

This commit is contained in:
Emilia Hane
2024-08-05 22:27:59 +02:00
committed by GitHub
parent f8e6e2e3d4
commit 33f612a41a
34 changed files with 353 additions and 320 deletions

6
Cargo.lock generated
View File

@ -7566,12 +7566,15 @@ dependencies = [
"enr",
"futures",
"reth-eth-wire-types",
"reth-ethereum-forks",
"reth-network-p2p",
"reth-network-peers",
"reth-network-types",
"reth-tokio-util",
"serde",
"thiserror",
"tokio",
"tokio-stream",
]
[[package]]
@ -7650,7 +7653,7 @@ dependencies = [
"reth-db-api",
"reth-engine-primitives",
"reth-evm",
"reth-network",
"reth-network-api",
"reth-payload-builder",
"reth-payload-primitives",
"reth-provider",
@ -7684,6 +7687,7 @@ dependencies = [
"reth-evm",
"reth-exex",
"reth-network",
"reth-network-api",
"reth-network-p2p",
"reth-node-api",
"reth-node-core",

View File

@ -17,7 +17,7 @@ use reth_downloaders::{
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_exex::ExExManagerHandle;
use reth_network::{BlockDownloaderProvider, NetworkEvents, NetworkHandle};
use reth_network::{BlockDownloaderProvider, NetworkEventListenerProvider, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_network_p2p::{headers::client::HeadersClient, BlockClient};
use reth_primitives::{BlockHashOrNumber, BlockNumber, B256};

View File

@ -1,6 +1,6 @@
use futures_util::StreamExt;
use reth::{
network::{NetworkEvent, NetworkEvents, PeersHandleProvider, PeersInfo},
network::{NetworkEvent, NetworkEventListenerProvider, PeersHandleProvider, PeersInfo},
rpc::types::PeerId,
};
use reth_network_peers::NodeRecord;
@ -16,7 +16,7 @@ pub struct NetworkTestContext<Network> {
impl<Network> NetworkTestContext<Network>
where
Network: NetworkEvents + PeersInfo + PeersHandleProvider,
Network: NetworkEventListenerProvider + PeersInfo + PeersHandleProvider,
{
/// Creates a new network helper
pub fn new(network: Network) -> Self {

View File

@ -10,7 +10,9 @@ use reth_engine_tree::tree::TreeConfig;
use reth_ethereum_engine::service::{ChainEvent, EthService};
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_exex::ExExManagerHandle;
use reth_network::{BlockDownloaderProvider, NetworkEvents, NetworkSyncUpdater, SyncState};
use reth_network::{
BlockDownloaderProvider, NetworkEventListenerProvider, NetworkSyncUpdater, SyncState,
};
use reth_node_api::{FullNodeTypes, NodeAddOns};
use reth_node_builder::{
hooks::NodeHooks,

View File

@ -13,11 +13,13 @@ workspace = true
[dependencies]
# reth
reth-eth-wire-types.workspace = true
alloy-rpc-types-admin.workspace = true
reth-network-peers.workspace = true
reth-network-types.workspace = true
reth-network-p2p.workspace = true
reth-eth-wire-types.workspace = true
reth-tokio-util.workspace = true
reth-ethereum-forks.workspace = true
# ethereum
alloy-primitives.workspace = true
@ -27,6 +29,7 @@ enr = { workspace = true, default-features = false, features = ["rust-secp256k1"
# async
futures.workspace = true
tokio-stream.workspace = true
# misc
thiserror.workspace = true

View File

@ -0,0 +1,233 @@
//! API related to listening for network events.
use std::{fmt, net::SocketAddr, sync::Arc};
use reth_eth_wire_types::{
message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
NodeData, PooledTransactions, Receipts, Status,
};
use reth_ethereum_forks::ForkId;
use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::PeerId;
use reth_network_types::PeerAddr;
use reth_tokio_util::EventStream;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// Provides event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkEventListenerProvider: Send + Sync {
/// Creates a new [`NetworkEvent`] listener channel.
fn event_listener(&self) -> EventStream<NetworkEvent>;
/// Returns a new [`DiscoveryEvent`] stream.
///
/// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
}
/// (Non-exhaustive) Events emitted by the network that are of interest for subscribers.
///
/// This includes any event types that may be relevant to tasks, for metrics, keep track of peers
/// etc.
#[derive(Debug, Clone)]
pub enum NetworkEvent {
/// Closed the peer session.
SessionClosed {
/// The identifier of the peer to which a session was closed.
peer_id: PeerId,
/// Why the disconnect was triggered
reason: Option<DisconnectReason>,
},
/// Established a new session with the given peer.
SessionEstablished {
/// The identifier of the peer to which a session was established.
peer_id: PeerId,
/// The remote addr of the peer to which a session was established.
remote_addr: SocketAddr,
/// The client version of the peer to which a session was established.
client_version: Arc<str>,
/// Capabilities the peer announced
capabilities: Arc<Capabilities>,
/// A request channel to the session task.
messages: PeerRequestSender,
/// The status of the peer to which a session was established.
status: Arc<Status>,
/// negotiated eth version of the session
version: EthVersion,
},
/// Event emitted when a new peer is added
PeerAdded(PeerId),
/// Event emitted when a new peer is removed
PeerRemoved(PeerId),
}
/// Events produced by the `Discovery` manager.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveryEvent {
/// Discovered a node
NewNode(DiscoveredEvent),
/// Retrieved a [`ForkId`] from the peer via ENR request, See <https://eips.ethereum.org/EIPS/eip-868>
EnrForkId(PeerId, ForkId),
}
/// Represents events related to peer discovery in the network.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveredEvent {
/// Indicates that a new peer has been discovered and queued for potential connection.
///
/// This event is generated when the system becomes aware of a new peer
/// but hasn't yet established a connection.
///
/// # Fields
///
/// * `peer_id` - The unique identifier of the discovered peer.
/// * `addr` - The network address of the discovered peer.
/// * `fork_id` - An optional identifier for the fork that this peer is associated with. `None`
/// if the peer is not associated with a specific fork.
EventQueued {
/// The unique identifier of the discovered peer.
peer_id: PeerId,
/// The network address of the discovered peer.
addr: PeerAddr,
/// An optional identifier for the fork that this peer is associated with.
/// `None` if the peer is not associated with a specific fork.
fork_id: Option<ForkId>,
},
}
/// Protocol related request messages that expect a response
#[derive(Debug)]
pub enum PeerRequest {
/// Requests block headers from the peer.
///
/// The response should be sent through the channel.
GetBlockHeaders {
/// The request for block headers.
request: GetBlockHeaders,
/// The channel to send the response for block headers.
response: oneshot::Sender<RequestResult<BlockHeaders>>,
},
/// Requests block bodies from the peer.
///
/// The response should be sent through the channel.
GetBlockBodies {
/// The request for block bodies.
request: GetBlockBodies,
/// The channel to send the response for block bodies.
response: oneshot::Sender<RequestResult<BlockBodies>>,
},
/// Requests pooled transactions from the peer.
///
/// The response should be sent through the channel.
GetPooledTransactions {
/// The request for pooled transactions.
request: GetPooledTransactions,
/// The channel to send the response for pooled transactions.
response: oneshot::Sender<RequestResult<PooledTransactions>>,
},
/// Requests `NodeData` from the peer.
///
/// The response should be sent through the channel.
GetNodeData {
/// The request for `NodeData`.
request: GetNodeData,
/// The channel to send the response for `NodeData`.
response: oneshot::Sender<RequestResult<NodeData>>,
},
/// Requests receipts from the peer.
///
/// The response should be sent through the channel.
GetReceipts {
/// The request for receipts.
request: GetReceipts,
/// The channel to send the response for receipts.
response: oneshot::Sender<RequestResult<Receipts>>,
},
}
// === impl PeerRequest ===
impl PeerRequest {
/// Invoked if we received a response which does not match the request
pub fn send_bad_response(self) {
self.send_err_response(RequestError::BadResponse)
}
/// Send an error back to the receiver.
pub fn send_err_response(self, err: RequestError) {
let _ = match self {
Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(),
Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(),
Self::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(),
Self::GetNodeData { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
};
}
/// Returns the [`EthMessage`] for this type
pub fn create_request_message(&self, request_id: u64) -> EthMessage {
match self {
Self::GetBlockHeaders { request, .. } => {
EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request })
}
Self::GetBlockBodies { request, .. } => {
EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() })
}
Self::GetPooledTransactions { request, .. } => {
EthMessage::GetPooledTransactions(RequestPair {
request_id,
message: request.clone(),
})
}
Self::GetNodeData { request, .. } => {
EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() })
}
Self::GetReceipts { request, .. } => {
EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() })
}
}
}
/// Consumes the type and returns the inner [`GetPooledTransactions`] variant.
pub fn into_get_pooled_transactions(self) -> Option<GetPooledTransactions> {
match self {
Self::GetPooledTransactions { request, .. } => Some(request),
_ => None,
}
}
}
/// A Cloneable connection for sending _requests_ directly to the session of a peer.
#[derive(Clone)]
pub struct PeerRequestSender {
/// id of the remote node.
pub peer_id: PeerId,
/// The Sender half connected to a session.
pub to_session_tx: mpsc::Sender<PeerRequest>,
}
// === impl PeerRequestSender ===
impl PeerRequestSender {
/// Constructs a new sender instance that's wired to a session
pub const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender<PeerRequest>) -> Self {
Self { peer_id, to_session_tx }
}
/// Attempts to immediately send a message on this Sender
pub fn try_send(&self, req: PeerRequest) -> Result<(), mpsc::error::TrySendError<PeerRequest>> {
self.to_session_tx.try_send(req)
}
/// Returns the peer id of the remote peer.
pub const fn peer_id(&self) -> &PeerId {
&self.peer_id
}
}
impl fmt::Debug for PeerRequestSender {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive()
}
}

View File

@ -16,16 +16,22 @@
pub mod downloaders;
/// Network Error
pub mod error;
pub mod events;
/// Implementation of network traits for that does nothing.
pub mod noop;
pub mod test_utils;
pub use alloy_rpc_types_admin::EthProtocolInfo;
use reth_network_p2p::sync::NetworkSyncUpdater;
pub use reth_network_p2p::BlockClient;
pub use reth_network_types::{PeerKind, Reputation, ReputationChangeKind};
pub use downloaders::BlockDownloaderProvider;
pub use error::NetworkError;
pub use events::{
DiscoveredEvent, DiscoveryEvent, NetworkEvent, NetworkEventListenerProvider, PeerRequest,
PeerRequestSender,
};
use std::{future::Future, net::SocketAddr, sync::Arc, time::Instant};
@ -35,6 +41,31 @@ use reth_network_peers::NodeRecord;
/// The `PeerId` type.
pub type PeerId = alloy_primitives::B512;
/// Helper trait that unifies network API needed to launch node.
pub trait FullNetwork:
BlockDownloaderProvider
+ NetworkSyncUpdater
+ NetworkInfo
+ NetworkEventListenerProvider
+ PeersInfo
+ Peers
+ Clone
+ 'static
{
}
impl<T> FullNetwork for T where
T: BlockDownloaderProvider
+ NetworkSyncUpdater
+ NetworkInfo
+ NetworkEventListenerProvider
+ PeersInfo
+ Peers
+ Clone
+ 'static
{
}
/// Provides general purpose information about the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkInfo: Send + Sync {

View File

@ -3,7 +3,7 @@ use criterion::*;
use futures::StreamExt;
use pprof::criterion::{Output, PProfProfiler};
use rand::thread_rng;
use reth_network::{test_utils::Testnet, NetworkEvents};
use reth_network::{test_utils::Testnet, NetworkEventListenerProvider};
use reth_network_api::Peers;
use reth_primitives::U256;
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};

View File

@ -15,6 +15,7 @@ use reth_discv5::{DiscoveredPeer, Discv5};
use reth_dns_discovery::{
DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver,
};
use reth_network_api::{DiscoveredEvent, DiscoveryEvent};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::PeerAddr;
use reth_primitives::{EnrForkIdEntry, ForkId};
@ -26,7 +27,6 @@ use tracing::trace;
use crate::{
cache::LruMap,
error::{NetworkError, ServiceKind},
manager::DiscoveredEvent,
};
/// Default max capacity for cache of discovered peers.
@ -328,15 +328,6 @@ impl Discovery {
}
}
/// Events produced by the [`Discovery`] manager.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveryEvent {
/// Discovered a node
NewNode(DiscoveredEvent),
/// Retrieved a [`ForkId`] from the peer via ENR request, See <https://eips.ethereum.org/EIPS/eip-868>
EnrForkId(PeerId, ForkId),
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -138,7 +138,10 @@ mod state;
mod swarm;
pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols};
pub use reth_network_api::{BlockDownloaderProvider, NetworkInfo, Peers, PeersInfo};
pub use reth_network_api::{
BlockDownloaderProvider, DiscoveredEvent, DiscoveryEvent, NetworkEvent,
NetworkEventListenerProvider, NetworkInfo, PeerRequest, PeerRequestSender, Peers, PeersInfo,
};
pub use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState};
pub use reth_network_types::{PeersConfig, SessionsConfig};
pub use session::{
@ -149,12 +152,11 @@ pub use session::{
pub use builder::NetworkBuilder;
pub use config::{NetworkConfig, NetworkConfigBuilder};
pub use discovery::{Discovery, DiscoveryEvent};
pub use discovery::Discovery;
pub use fetch::FetchClient;
pub use flattened_response::FlattenedResponse;
pub use manager::{DiscoveredEvent, NetworkEvent, NetworkManager};
pub use message::PeerRequest;
pub use manager::NetworkManager;
pub use metrics::TxTypesCounter;
pub use network::{FullNetwork, NetworkEvents, NetworkHandle, NetworkProtocols};
pub use network::{NetworkHandle, NetworkProtocols};
pub use swarm::NetworkConnectionState;
pub use transactions::{FilterAnnouncement, MessageFilter, ValidateTx68};

View File

@ -29,15 +29,14 @@ use std::{
use futures::{Future, StreamExt};
use parking_lot::Mutex;
use reth_eth_wire::{
capability::CapabilityMessage, Capabilities, DisconnectReason, EthVersion, Status,
};
use reth_eth_wire::{capability::CapabilityMessage, Capabilities, DisconnectReason};
use reth_fs_util::{self as fs, FsPathError};
use reth_metrics::common::mpsc::UnboundedMeteredSender;
use reth_network_api::{test_utils::PeersHandle, EthProtocolInfo, NetworkStatus, PeerInfo};
use reth_network_api::{
test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::{PeerAddr, ReputationChangeKind};
use reth_primitives::ForkId;
use reth_network_types::ReputationChangeKind;
use reth_storage_api::BlockNumReader;
use reth_tasks::shutdown::GracefulShutdown;
use reth_tokio_util::EventSender;
@ -54,7 +53,7 @@ use crate::{
eth_requests::IncomingEthRequest,
import::{BlockImport, BlockImportOutcome, BlockValidation},
listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender},
message::{NewBlockMessage, PeerMessage},
metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
network::{NetworkHandle, NetworkHandleMessage},
peers::PeersManager,
@ -1044,67 +1043,6 @@ impl Future for NetworkManager {
}
}
/// (Non-exhaustive) Events emitted by the network that are of interest for subscribers.
///
/// This includes any event types that may be relevant to tasks, for metrics, keep track of peers
/// etc.
#[derive(Debug, Clone)]
pub enum NetworkEvent {
/// Closed the peer session.
SessionClosed {
/// The identifier of the peer to which a session was closed.
peer_id: PeerId,
/// Why the disconnect was triggered
reason: Option<DisconnectReason>,
},
/// Established a new session with the given peer.
SessionEstablished {
/// The identifier of the peer to which a session was established.
peer_id: PeerId,
/// The remote addr of the peer to which a session was established.
remote_addr: SocketAddr,
/// The client version of the peer to which a session was established.
client_version: Arc<str>,
/// Capabilities the peer announced
capabilities: Arc<Capabilities>,
/// A request channel to the session task.
messages: PeerRequestSender,
/// The status of the peer to which a session was established.
status: Arc<Status>,
/// negotiated eth version of the session
version: EthVersion,
},
/// Event emitted when a new peer is added
PeerAdded(PeerId),
/// Event emitted when a new peer is removed
PeerRemoved(PeerId),
}
/// Represents events related to peer discovery in the network.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveredEvent {
/// Indicates that a new peer has been discovered and queued for potential connection.
///
/// This event is generated when the system becomes aware of a new peer
/// but hasn't yet established a connection.
///
/// # Fields
///
/// * `peer_id` - The unique identifier of the discovered peer.
/// * `addr` - The network address of the discovered peer.
/// * `fork_id` - An optional identifier for the fork that this peer is associated with. `None`
/// if the peer is not associated with a specific fork.
EventQueued {
/// The unique identifier of the discovered peer.
peer_id: PeerId,
/// The network address of the discovered peer.
addr: PeerAddr,
/// An optional identifier for the fork that this peer is associated with.
/// `None` if the peer is not associated with a specific fork.
fork_id: Option<ForkId>,
},
}
#[derive(Debug, Default)]
struct NetworkManagerPollDurations {
acc_network_handle: Duration,

View File

@ -4,7 +4,6 @@
//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
use std::{
fmt,
sync::Arc,
task::{ready, Context, Poll},
};
@ -12,16 +11,15 @@ use std::{
use futures::FutureExt;
use reth_eth_wire::{
capability::RawCapabilityMessage, message::RequestPair, BlockBodies, BlockHeaders, EthMessage,
GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, NewBlock,
NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts,
SharedTransactions, Transactions,
GetBlockBodies, GetBlockHeaders, NewBlock, NewBlockHashes, NewPooledTransactionHashes,
NodeData, PooledTransactions, Receipts, SharedTransactions, Transactions,
};
use reth_network_api::PeerRequest;
use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::PeerId;
use reth_primitives::{
BlockBody, Bytes, Header, PooledTransactionsElement, ReceiptWithBloom, B256,
};
use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot};
use tokio::sync::oneshot;
/// Internal form of a `NewBlock` message
#[derive(Debug, Clone)]
@ -75,108 +73,6 @@ pub enum BlockRequest {
GetBlockBodies(GetBlockBodies),
}
/// Protocol related request messages that expect a response
#[derive(Debug)]
pub enum PeerRequest {
/// Requests block headers from the peer.
///
/// The response should be sent through the channel.
GetBlockHeaders {
/// The request for block headers.
request: GetBlockHeaders,
/// The channel to send the response for block headers.
response: oneshot::Sender<RequestResult<BlockHeaders>>,
},
/// Requests block bodies from the peer.
///
/// The response should be sent through the channel.
GetBlockBodies {
/// The request for block bodies.
request: GetBlockBodies,
/// The channel to send the response for block bodies.
response: oneshot::Sender<RequestResult<BlockBodies>>,
},
/// Requests pooled transactions from the peer.
///
/// The response should be sent through the channel.
GetPooledTransactions {
/// The request for pooled transactions.
request: GetPooledTransactions,
/// The channel to send the response for pooled transactions.
response: oneshot::Sender<RequestResult<PooledTransactions>>,
},
/// Requests `NodeData` from the peer.
///
/// The response should be sent through the channel.
GetNodeData {
/// The request for `NodeData`.
request: GetNodeData,
/// The channel to send the response for `NodeData`.
response: oneshot::Sender<RequestResult<NodeData>>,
},
/// Requests receipts from the peer.
///
/// The response should be sent through the channel.
GetReceipts {
/// The request for receipts.
request: GetReceipts,
/// The channel to send the response for receipts.
response: oneshot::Sender<RequestResult<Receipts>>,
},
}
// === impl PeerRequest ===
impl PeerRequest {
/// Invoked if we received a response which does not match the request
pub(crate) fn send_bad_response(self) {
self.send_err_response(RequestError::BadResponse)
}
/// Send an error back to the receiver.
pub(crate) fn send_err_response(self, err: RequestError) {
let _ = match self {
Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(),
Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(),
Self::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(),
Self::GetNodeData { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
};
}
/// Returns the [`EthMessage`] for this type
pub fn create_request_message(&self, request_id: u64) -> EthMessage {
match self {
Self::GetBlockHeaders { request, .. } => {
EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request })
}
Self::GetBlockBodies { request, .. } => {
EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() })
}
Self::GetPooledTransactions { request, .. } => {
EthMessage::GetPooledTransactions(RequestPair {
request_id,
message: request.clone(),
})
}
Self::GetNodeData { request, .. } => {
EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() })
}
Self::GetReceipts { request, .. } => {
EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() })
}
}
}
/// Consumes the type and returns the inner [`GetPooledTransactions`] variant.
pub fn into_get_pooled_transactions(self) -> Option<GetPooledTransactions> {
match self {
Self::GetPooledTransactions { request, .. } => Some(request),
_ => None,
}
}
}
/// Corresponding variant for [`PeerRequest`].
#[derive(Debug)]
pub enum PeerResponse {
@ -308,37 +204,3 @@ impl PeerResponseResult {
self.err().is_some()
}
}
/// A Cloneable connection for sending _requests_ directly to the session of a peer.
#[derive(Clone)]
pub struct PeerRequestSender {
/// id of the remote node.
pub(crate) peer_id: PeerId,
/// The Sender half connected to a session.
pub(crate) to_session_tx: mpsc::Sender<PeerRequest>,
}
// === impl PeerRequestSender ===
impl PeerRequestSender {
/// Constructs a new sender instance that's wired to a session
pub(crate) const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender<PeerRequest>) -> Self {
Self { peer_id, to_session_tx }
}
/// Attempts to immediately send a message on this Sender
pub fn try_send(&self, req: PeerRequest) -> Result<(), TrySendError<PeerRequest>> {
self.to_session_tx.try_send(req)
}
/// Returns the peer id of the remote peer.
pub const fn peer_id(&self) -> &PeerId {
&self.peer_id
}
}
impl fmt::Debug for PeerRequestSender {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive()
}
}

View File

@ -12,7 +12,9 @@ use reth_discv4::Discv4;
use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions};
use reth_network_api::{
test_utils::{PeersHandle, PeersHandleProvider},
BlockDownloaderProvider, NetworkError, NetworkInfo, NetworkStatus, PeerInfo, Peers, PeersInfo,
BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent,
NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers,
PeersInfo,
};
use reth_network_p2p::{
sync::{NetworkSyncUpdater, SyncState, SyncStateProvider},
@ -30,36 +32,10 @@ use tokio::sync::{
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::{
config::NetworkMode, discovery::DiscoveryEvent, manager::NetworkEvent, message::PeerRequest,
protocol::RlpxSubProtocol, swarm::NetworkConnectionState, transactions::TransactionsHandle,
FetchClient,
config::NetworkMode, protocol::RlpxSubProtocol, swarm::NetworkConnectionState,
transactions::TransactionsHandle, FetchClient,
};
/// Helper trait that unifies network API needed to launch node.
pub trait FullNetwork:
BlockDownloaderProvider
+ NetworkSyncUpdater
+ NetworkInfo
+ NetworkEvents
+ PeersInfo
+ Peers
+ Clone
+ 'static
{
}
impl<T> FullNetwork for T where
T: BlockDownloaderProvider
+ NetworkSyncUpdater
+ NetworkInfo
+ NetworkEvents
+ PeersInfo
+ Peers
+ Clone
+ 'static
{
}
/// A _shareable_ network frontend. Used to interact with the network.
///
/// See also [`NetworkManager`](crate::NetworkManager).
@ -207,7 +183,7 @@ impl NetworkHandle {
// === API Implementations ===
impl NetworkEvents for NetworkHandle {
impl NetworkEventListenerProvider for NetworkHandle {
fn event_listener(&self) -> EventStream<NetworkEvent> {
self.inner.event_sender.new_listener()
}
@ -434,17 +410,6 @@ struct NetworkInner {
event_sender: EventSender<NetworkEvent>,
}
/// Provides event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkEvents: Send + Sync {
/// Creates a new [`NetworkEvent`] listener channel.
fn event_listener(&self) -> EventStream<NetworkEvent>;
/// Returns a new [`DiscoveryEvent`] stream.
///
/// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
}
/// Provides access to modify the network's additional protocol handlers.
pub trait NetworkProtocols: Send + Sync {
/// Adds an additional protocol handler to the `RLPx` sub-protocol list.

View File

@ -18,6 +18,7 @@ use reth_eth_wire::{
Capabilities, DisconnectP2P, DisconnectReason, EthMessage,
};
use reth_metrics::common::mpsc::MeteredPollSender;
use reth_network_api::PeerRequest;
use reth_network_p2p::error::RequestError;
use reth_network_peers::PeerId;
use reth_network_types::session::config::INITIAL_REQUEST_TIMEOUT;
@ -31,7 +32,7 @@ use tokio_util::sync::PollSender;
use tracing::{debug, trace};
use crate::{
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerResponse, PeerResponseResult},
message::{NewBlockMessage, PeerMessage, PeerResponse, PeerResponseResult},
session::{
conn::EthRlpxConnection,
handle::{ActiveSessionMessage, SessionCommand},

View File

@ -11,8 +11,6 @@ pub use handle::{
SessionCommand,
};
pub use crate::message::PeerRequestSender;
pub use reth_network_api::{Direction, PeerInfo};
use std::{
@ -33,6 +31,7 @@ use reth_eth_wire::{
UnauthedEthStream, UnauthedP2PStream,
};
use reth_metrics::common::mpsc::MeteredPollSender;
use reth_network_api::PeerRequestSender;
use reth_network_peers::PeerId;
use reth_network_types::SessionsConfig;
use reth_primitives::{ForkFilter, ForkId, ForkTransition, Head};

View File

@ -14,6 +14,7 @@ use std::{
use rand::seq::SliceRandom;
use reth_eth_wire::{BlockHashNumber, Capabilities, DisconnectReason, NewBlockHashes, Status};
use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
use reth_network_peers::PeerId;
use reth_network_types::{PeerAddr, PeerKind};
use reth_primitives::{ForkId, B256};
@ -22,13 +23,9 @@ use tracing::{debug, trace};
use crate::{
cache::LruCache,
discovery::{Discovery, DiscoveryEvent},
discovery::Discovery,
fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
manager::DiscoveredEvent,
message::{
BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse,
PeerResponseResult,
},
message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
peers::{PeerAction, PeersManager},
FetchClient,
};
@ -543,26 +540,28 @@ pub(crate) enum StateAction {
#[cfg(test)]
mod tests {
use crate::{
discovery::Discovery,
fetch::StateFetcher,
message::PeerRequestSender,
peers::PeersManager,
state::{BlockNumReader, NetworkState},
PeerRequest,
};
use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthVersion};
use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
use reth_network_peers::PeerId;
use reth_primitives::{BlockBody, Header, B256};
use reth_provider::test_utils::NoopProvider;
use std::{
future::poll_fn,
sync::{atomic::AtomicU64, Arc},
};
use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthVersion};
use reth_network_api::PeerRequestSender;
use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
use reth_network_peers::PeerId;
use reth_primitives::{BlockBody, Header, B256};
use reth_provider::test_utils::NoopProvider;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use crate::{
discovery::Discovery,
fetch::StateFetcher,
peers::PeersManager,
state::{BlockNumReader, NetworkState},
PeerRequest,
};
/// Returns a testing instance of the [`NetworkState`].
fn state() -> NetworkState {
let peers = PeersManager::default();

View File

@ -10,12 +10,13 @@ use futures::Stream;
use reth_eth_wire::{
capability::CapabilityMessage, errors::EthStreamError, Capabilities, EthVersion, Status,
};
use reth_network_api::PeerRequestSender;
use reth_network_peers::PeerId;
use tracing::trace;
use crate::{
listener::{ConnectionListener, ListenerEvent},
message::{PeerMessage, PeerRequestSender},
message::PeerMessage,
peers::InboundConnectionError,
protocol::IntoRlpxSubProtocol,
session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},

View File

@ -14,7 +14,7 @@ use reth_chainspec::MAINNET;
use reth_eth_wire::{protocol::Protocol, DisconnectReason, HelloMessageWithProtocols};
use reth_network_api::{
test_utils::{PeersHandle, PeersHandleProvider},
NetworkInfo, Peers,
NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
};
use reth_network_peers::PeerId;
use reth_provider::test_utils::NoopProvider;
@ -41,8 +41,7 @@ use crate::{
eth_requests::EthRequestHandler,
protocol::IntoRlpxSubProtocol,
transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig},
NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkHandle,
NetworkManager,
NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
};
/// A test network consisting of multiple peers.

View File

@ -39,6 +39,7 @@ use reth_eth_wire::{
DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
};
use reth_network_api::PeerRequest;
use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::PeerId;
use reth_primitives::{PooledTransactionsElement, TxHash};
@ -58,7 +59,6 @@ use super::{
use crate::{
cache::{LruCache, LruMap},
duration_metered_exec,
message::PeerRequest,
metrics::TransactionFetcherMetrics,
transactions::{validation, PartiallyFilterMessage},
};

View File

@ -38,7 +38,9 @@ use reth_eth_wire::{
PooledTransactions, RequestTxHashes, Transactions,
};
use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
use reth_network_api::Peers;
use reth_network_api::{
NetworkEvent, NetworkEventListenerProvider, PeerRequest, PeerRequestSender, Peers,
};
use reth_network_p2p::{
error::{RequestError, RequestResult},
sync::SyncStateProvider,
@ -63,12 +65,9 @@ use crate::{
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
},
cache::LruCache,
duration_metered_exec,
manager::NetworkEvent,
message::{PeerRequest, PeerRequestSender},
metered_poll_nested_stream_with_budget,
duration_metered_exec, metered_poll_nested_stream_with_budget,
metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
NetworkEvents, NetworkHandle,
NetworkHandle,
};
/// The future for importing transactions into the pool.

View File

@ -1,7 +1,7 @@
use reth_eth_wire::{GetPooledTransactions, PooledTransactions};
use reth_network::{
test_utils::{NetworkEventStream, Testnet},
NetworkEvents, PeerRequest,
NetworkEventListenerProvider, PeerRequest,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState};

View File

@ -10,8 +10,8 @@ use reth_eth_wire::{DisconnectReason, HeadersDirection};
use reth_net_banlist::BanList;
use reth_network::{
test_utils::{enr_to_peer_id, NetworkEventStream, PeerConfig, Testnet, GETH_TIMEOUT},
BlockDownloaderProvider, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkManager,
PeersConfig,
BlockDownloaderProvider, NetworkConfigBuilder, NetworkEvent, NetworkEventListenerProvider,
NetworkManager, PeersConfig,
};
use reth_network_api::{NetworkInfo, Peers, PeersInfo};
use reth_network_p2p::{

View File

@ -7,7 +7,7 @@ use rand::Rng;
use reth_eth_wire::HeadersDirection;
use reth_network::{
test_utils::{NetworkEventStream, Testnet},
BlockDownloaderProvider, NetworkEvents,
BlockDownloaderProvider, NetworkEventListenerProvider,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_network_p2p::{

View File

@ -4,7 +4,7 @@ use futures::StreamExt;
use reth_eth_wire::EthVersion;
use reth_network::{
test_utils::{PeerConfig, Testnet},
NetworkEvent, NetworkEvents,
NetworkEvent, NetworkEventListenerProvider,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::test_utils::NoopProvider;

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use futures::StreamExt;
use rand::thread_rng;
use reth_network::{test_utils::Testnet, NetworkEvent, NetworkEvents};
use reth_network::{test_utils::Testnet, NetworkEvent, NetworkEventListenerProvider};
use reth_network_api::PeersInfo;
use reth_primitives::{TransactionSigned, TxLegacy, U256};
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};

View File

@ -17,7 +17,7 @@ reth-provider.workspace = true
reth-db-api.workspace = true
reth-engine-primitives.workspace = true
reth-transaction-pool.workspace = true
reth-network.workspace = true
reth-payload-builder.workspace = true
reth-payload-primitives.workspace = true
reth-tasks.workspace = true
reth-network-api.workspace = true

View File

@ -7,7 +7,7 @@ use reth_db_api::{
database_metrics::{DatabaseMetadata, DatabaseMetrics},
};
use reth_evm::execute::BlockExecutorProvider;
use reth_network::FullNetwork;
use reth_network_api::FullNetwork;
use reth_payload_builder::PayloadBuilderHandle;
use reth_provider::FullProvider;
use reth_tasks::TaskExecutor;

View File

@ -49,6 +49,7 @@ reth-rpc-types.workspace = true
reth-engine-util.workspace = true
reth-cli-util.workspace = true
reth-rpc-eth-types.workspace = true
reth-network-api.workspace = true
## async
futures.workspace = true

View File

@ -23,7 +23,8 @@ pub use pool::*;
use reth_consensus::Consensus;
use reth_evm::execute::BlockExecutorProvider;
use reth_network::{FullNetwork, NetworkHandle};
use reth_network::NetworkHandle;
use reth_network_api::FullNetwork;
use reth_payload_builder::PayloadBuilderHandle;
use reth_transaction_pool::TransactionPool;

View File

@ -15,7 +15,7 @@ use reth_blockchain_tree::{noop::NoopBlockchainTree, BlockchainTreeConfig};
use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider};
use reth_engine_util::EngineMessageStreamExt;
use reth_exex::ExExManagerHandle;
use reth_network::{BlockDownloaderProvider, NetworkEvents};
use reth_network::{BlockDownloaderProvider, NetworkEventListenerProvider};
use reth_node_api::{FullNodeComponents, FullNodeTypes, NodeAddOns};
use reth_node_core::{
dirs::{ChainPath, DataDirPath},

View File

@ -28,7 +28,7 @@ reth-tasks = { workspace = true, features = ["rayon"] }
reth-consensus-common.workspace = true
reth-rpc-types-compat.workspace = true
revm-inspectors = { workspace = true, features = ["js-tracer"] }
reth-network-peers.workspace = true
reth-network-peers = { workspace = true, features = ["secp256k1"] }
reth-evm.workspace = true
reth-rpc-eth-types.workspace = true
reth-rpc-server-types.workspace = true

View File

@ -14,7 +14,7 @@
use chainspec::{boot_nodes, bsc_chain_spec};
use reth_discv4::Discv4ConfigBuilder;
use reth_network::{NetworkConfig, NetworkEvent, NetworkEvents, NetworkManager};
use reth_network::{NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager};
use reth_network_api::PeersInfo;
use reth_primitives::{ForkHash, ForkId};
use reth_tracing::{

View File

@ -7,7 +7,9 @@
//! ```
use futures::StreamExt;
use reth_network::{config::rng_secret_key, NetworkConfig, NetworkEvents, NetworkManager};
use reth_network::{
config::rng_secret_key, NetworkConfig, NetworkEventListenerProvider, NetworkManager,
};
use reth_provider::test_utils::NoopProvider;
#[tokio::main]

View File

@ -12,7 +12,7 @@
use chain_cfg::{boot_nodes, head, polygon_chain_spec};
use reth_discv4::Discv4ConfigBuilder;
use reth_network::{
config::NetworkMode, NetworkConfig, NetworkEvent, NetworkEvents, NetworkManager,
config::NetworkMode, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager,
};
use reth_provider::test_utils::NoopProvider;
use reth_tracing::{