chore(sdk): add NetworkPrimitives for NetworkManager (#12530)

Co-authored-by: dkathiriya <lakshya-sky@users.noreply.github.com>
This commit is contained in:
Darshan Kathiriya
2024-11-15 23:59:53 -05:00
committed by GitHub
parent 7745046b0b
commit 2dc9a06321
15 changed files with 130 additions and 107 deletions

1
Cargo.lock generated
View File

@ -7800,6 +7800,7 @@ dependencies = [
"reth-dns-discovery",
"reth-ecies",
"reth-eth-wire",
"reth-eth-wire-types",
"reth-fs-util",
"reth-metrics",
"reth-net-banlist",

View File

@ -24,6 +24,7 @@ reth-discv4.workspace = true
reth-discv5.workspace = true
reth-dns-discovery.workspace = true
reth-eth-wire.workspace = true
reth-eth-wire-types.workspace = true
reth-ecies.workspace = true
reth-tasks.workspace = true
reth-transaction-pool.workspace = true
@ -111,6 +112,7 @@ serde = [
"reth-dns-discovery/serde",
"reth-eth-wire/serde",
"reth-provider?/serde",
"reth-eth-wire-types/serde",
"alloy-consensus/serde",
"alloy-eips/serde",
"alloy-primitives/serde",
@ -118,7 +120,7 @@ serde = [
"parking_lot/serde",
"rand/serde",
"smallvec/serde",
"url/serde"
"url/serde",
]
test-utils = [
"dep:reth-provider",

View File

@ -1,5 +1,6 @@
//! Builder support for configuring the entire setup.
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
use reth_network_api::test_utils::PeersHandleProvider;
use reth_transaction_pool::TransactionPool;
use tokio::sync::mpsc;
@ -16,8 +17,8 @@ pub(crate) const ETH_REQUEST_CHANNEL_CAPACITY: usize = 256;
/// A builder that can configure all components of the network.
#[allow(missing_debug_implementations)]
pub struct NetworkBuilder<Tx, Eth> {
pub(crate) network: NetworkManager,
pub struct NetworkBuilder<Tx, Eth, N: NetworkPrimitives = EthNetworkPrimitives> {
pub(crate) network: NetworkManager<N>,
pub(crate) transactions: Tx,
pub(crate) request_handler: Eth,
}

View File

@ -6,7 +6,9 @@ use reth_chainspec::{ChainSpecProvider, EthChainSpec, Hardforks};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NatResolver, DEFAULT_DISCOVERY_ADDRESS};
use reth_discv5::NetworkStackId;
use reth_dns_discovery::DnsDiscoveryConfig;
use reth_eth_wire::{HelloMessage, HelloMessageWithProtocols, Status};
use reth_eth_wire::{
EthNetworkPrimitives, HelloMessage, HelloMessageWithProtocols, NetworkPrimitives, Status,
};
use reth_network_peers::{mainnet_nodes, pk2id, sepolia_nodes, PeerId, TrustedPeer};
use reth_network_types::{PeersConfig, SessionsConfig};
use reth_primitives::{ForkFilter, Head};
@ -32,7 +34,7 @@ pub fn rng_secret_key() -> SecretKey {
/// All network related initialization settings.
#[derive(Debug)]
pub struct NetworkConfig<C> {
pub struct NetworkConfig<C, N: NetworkPrimitives = EthNetworkPrimitives> {
/// The client type that can interact with the chain.
///
/// This type is used to fetch the block number after we established a session and received the
@ -66,7 +68,7 @@ pub struct NetworkConfig<C> {
/// first hardfork, `Frontier` for mainnet.
pub fork_filter: ForkFilter,
/// The block importer type.
pub block_import: Box<dyn BlockImport>,
pub block_import: Box<dyn BlockImport<N::Block>>,
/// The default mode of the network.
pub network_mode: NetworkMode,
/// The executor to use for spawning tasks.
@ -87,9 +89,9 @@ pub struct NetworkConfig<C> {
// === impl NetworkConfig ===
impl NetworkConfig<()> {
impl<N: NetworkPrimitives> NetworkConfig<(), N> {
/// Convenience method for creating the corresponding builder type
pub fn builder(secret_key: SecretKey) -> NetworkConfigBuilder {
pub fn builder(secret_key: SecretKey) -> NetworkConfigBuilder<N> {
NetworkConfigBuilder::new(secret_key)
}
@ -99,7 +101,7 @@ impl NetworkConfig<()> {
}
}
impl<C> NetworkConfig<C> {
impl<C, N: NetworkPrimitives> NetworkConfig<C, N> {
/// Create a new instance with all mandatory fields set, rest is field with defaults.
pub fn new(client: C, secret_key: SecretKey) -> Self
where
@ -134,12 +136,13 @@ impl<C> NetworkConfig<C> {
}
}
impl<C> NetworkConfig<C>
impl<C, N> NetworkConfig<C, N>
where
C: BlockNumReader + 'static,
N: NetworkPrimitives,
{
/// Convenience method for calling [`NetworkManager::new`].
pub async fn manager(self) -> Result<NetworkManager, NetworkError> {
pub async fn manager(self) -> Result<NetworkManager<N>, NetworkError> {
NetworkManager::new(self).await
}
}
@ -164,7 +167,7 @@ where
/// Builder for [`NetworkConfig`](struct.NetworkConfig.html).
#[derive(Debug)]
pub struct NetworkConfigBuilder {
pub struct NetworkConfigBuilder<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The node's secret key, from which the node's identity is derived.
secret_key: SecretKey,
/// How to configure discovery over DNS.
@ -196,7 +199,7 @@ pub struct NetworkConfigBuilder {
/// Whether tx gossip is disabled
tx_gossip_disabled: bool,
/// The block importer type
block_import: Option<Box<dyn BlockImport>>,
block_import: Option<Box<dyn BlockImport<N::Block>>>,
/// How to instantiate transactions manager.
transactions_manager_config: TransactionsManagerConfig,
/// The NAT resolver for external IP
@ -206,7 +209,7 @@ pub struct NetworkConfigBuilder {
// === impl NetworkConfigBuilder ===
#[allow(missing_docs)]
impl NetworkConfigBuilder {
impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
/// Create a new builder instance with a random secret key.
pub fn with_rng_secret_key() -> Self {
Self::new(rng_secret_key())
@ -480,7 +483,7 @@ impl NetworkConfigBuilder {
}
/// Sets the block import type.
pub fn block_import(mut self, block_import: Box<dyn BlockImport>) -> Self {
pub fn block_import(mut self, block_import: Box<dyn BlockImport<N::Block>>) -> Self {
self.block_import = Some(block_import);
self
}
@ -490,7 +493,7 @@ impl NetworkConfigBuilder {
pub fn build_with_noop_provider<ChainSpec>(
self,
chain_spec: Arc<ChainSpec>,
) -> NetworkConfig<NoopBlockReader<ChainSpec>>
) -> NetworkConfig<NoopBlockReader<ChainSpec>, N>
where
ChainSpec: EthChainSpec + Hardforks + 'static,
{
@ -509,7 +512,7 @@ impl NetworkConfigBuilder {
/// The given client is to be used for interacting with the chain, for example fetching the
/// corresponding block for a given block hash we receive from a peer in the status message when
/// establishing a connection.
pub fn build<C>(self, client: C) -> NetworkConfig<C>
pub fn build<C>(self, client: C) -> NetworkConfig<C, N>
where
C: ChainSpecProvider<ChainSpec: Hardforks>,
{

View File

@ -12,8 +12,8 @@ use alloy_eips::BlockHashOrNumber;
use alloy_rlp::Encodable;
use futures::StreamExt;
use reth_eth_wire::{
BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData, GetReceipts,
HeadersDirection, NodeData, Receipts,
BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::error::RequestResult;
@ -54,7 +54,7 @@ const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
/// This can be spawned to another task and is supposed to be run as background service.
#[derive(Debug)]
#[must_use = "Manager does nothing unless polled."]
pub struct EthRequestHandler<C> {
pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
/// The client type that can interact with the chain.
client: C,
/// Used for reporting peers.
@ -62,15 +62,15 @@ pub struct EthRequestHandler<C> {
#[allow(dead_code)]
peers: PeersHandle,
/// Incoming request from the [`NetworkManager`](crate::NetworkManager).
incoming_requests: ReceiverStream<IncomingEthRequest>,
incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
/// Metrics for the eth request handler.
metrics: EthRequestHandlerMetrics,
}
// === impl EthRequestHandler ===
impl<C> EthRequestHandler<C> {
impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
/// Create a new instance
pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest>) -> Self {
pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
Self {
client,
peers,
@ -148,7 +148,7 @@ where
&self,
_peer_id: PeerId,
request: GetBlockHeaders,
response: oneshot::Sender<RequestResult<BlockHeaders>>,
response: oneshot::Sender<RequestResult<BlockHeaders<Header>>>,
) {
self.metrics.eth_headers_requests_received_total.increment(1);
let headers = self.get_headers_response(request);
@ -159,7 +159,7 @@ where
&self,
_peer_id: PeerId,
request: GetBlockBodies,
response: oneshot::Sender<RequestResult<BlockBodies>>,
response: oneshot::Sender<RequestResult<BlockBodies<BlockBody>>>,
) {
self.metrics.eth_bodies_requests_received_total.increment(1);
let mut bodies = Vec::new();
@ -272,7 +272,7 @@ where
/// All `eth` request related to blocks delegated by the network.
#[derive(Debug)]
pub enum IncomingEthRequest {
pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Request Block headers from the peer.
///
/// The response should be sent through the channel.
@ -282,7 +282,7 @@ pub enum IncomingEthRequest {
/// The specific block headers requested.
request: GetBlockHeaders,
/// The channel sender for the response containing block headers.
response: oneshot::Sender<RequestResult<BlockHeaders>>,
response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
},
/// Request Block bodies from the peer.
///
@ -293,7 +293,7 @@ pub enum IncomingEthRequest {
/// The specific block bodies requested.
request: GetBlockBodies,
/// The channel sender for the response containing block bodies.
response: oneshot::Sender<RequestResult<BlockBodies>>,
response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
},
/// Request Node Data from the peer.
///

View File

@ -46,7 +46,9 @@
//!
//! ```
//! # async fn launch() {
//! use reth_network::{config::rng_secret_key, NetworkConfig, NetworkManager};
//! use reth_network::{
//! config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkManager,
//! };
//! use reth_network_peers::mainnet_nodes;
//! use reth_provider::test_utils::NoopProvider;
//!
@ -59,7 +61,7 @@
//! let config = NetworkConfig::builder(local_key).boot_nodes(mainnet_nodes()).build(client);
//!
//! // create the network instance
//! let network = NetworkManager::new(config).await.unwrap();
//! let network = NetworkManager::<EthNetworkPrimitives>::new(config).await.unwrap();
//!
//! // keep a handle to the network and spawn it
//! let handle = network.handle().clone();
@ -138,6 +140,7 @@ mod state;
mod swarm;
pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols};
pub use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
pub use reth_network_api::{
BlockDownloaderProvider, DiscoveredEvent, DiscoveryEvent, NetworkEvent,
NetworkEventListenerProvider, NetworkInfo, PeerRequest, PeerRequestSender, Peers, PeersInfo,

View File

@ -29,7 +29,10 @@ use std::{
use futures::{Future, StreamExt};
use parking_lot::Mutex;
use reth_eth_wire::{capability::CapabilityMessage, Capabilities, DisconnectReason};
use reth_eth_wire::{
capability::CapabilityMessage, Capabilities, DisconnectReason, EthNetworkPrimitives,
NetworkPrimitives,
};
use reth_fs_util::{self as fs, FsPathError};
use reth_metrics::common::mpsc::UnboundedMeteredSender;
use reth_network_api::{
@ -76,17 +79,17 @@ use crate::{
/// include_mmd!("docs/mermaid/network-manager.mmd")
#[derive(Debug)]
#[must_use = "The NetworkManager does nothing unless polled"]
pub struct NetworkManager {
pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The type that manages the actual network part, which includes connections.
swarm: Swarm,
swarm: Swarm<N>,
/// Underlying network handle that can be shared.
handle: NetworkHandle,
handle: NetworkHandle<N>,
/// Receiver half of the command channel set up between this type and the [`NetworkHandle`]
from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage>,
from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
/// Handles block imports according to the `eth` protocol.
block_import: Box<dyn BlockImport>,
block_import: Box<dyn BlockImport<N::Block>>,
/// Sender for high level network events.
event_sender: EventSender<NetworkEvent>,
event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
/// Sender half to send events to the
/// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured.
to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent>>,
@ -103,7 +106,7 @@ pub struct NetworkManager {
/// Thus, we use a bounded channel here to avoid unbounded build up if the node is flooded with
/// requests. This channel size is set at
/// [`ETH_REQUEST_CHANNEL_CAPACITY`](crate::builder::ETH_REQUEST_CHANNEL_CAPACITY)
to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest>>,
to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
/// Tracks the number of active session (connected peers).
///
/// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
@ -116,7 +119,7 @@ pub struct NetworkManager {
}
// === impl NetworkManager ===
impl NetworkManager {
impl<N: NetworkPrimitives> NetworkManager<N> {
/// Sets the dedicated channel for events indented for the
/// [`TransactionsManager`](crate::transactions::TransactionsManager).
pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent>) {
@ -126,7 +129,7 @@ impl NetworkManager {
/// Sets the dedicated channel for events indented for the
/// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest>) {
pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
self.to_eth_request_handler = Some(tx);
}
@ -138,7 +141,7 @@ impl NetworkManager {
/// Returns the [`NetworkHandle`] that can be cloned and shared.
///
/// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
pub const fn handle(&self) -> &NetworkHandle {
pub const fn handle(&self) -> &NetworkHandle<N> {
&self.handle
}
@ -165,7 +168,7 @@ impl NetworkManager {
/// The [`NetworkManager`] is an endless future that needs to be polled in order to advance the
/// state of the entire network.
pub async fn new<C: BlockNumReader + 'static>(
config: NetworkConfig<C>,
config: NetworkConfig<C, N>,
) -> Result<Self, NetworkError> {
let NetworkConfig {
client,
@ -253,7 +256,7 @@ impl NetworkManager {
let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
let event_sender: EventSender<NetworkEvent> = Default::default();
let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
let handle = NetworkHandle::new(
Arc::clone(&num_active_peers),
@ -314,14 +317,14 @@ impl NetworkManager {
/// }
/// ```
pub async fn builder<C: BlockNumReader + 'static>(
config: NetworkConfig<C>,
) -> Result<NetworkBuilder<(), ()>, NetworkError> {
config: NetworkConfig<C, N>,
) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
let network = Self::new(config).await?;
Ok(network.into_builder())
}
/// Create a [`NetworkBuilder`] to configure all components of the network
pub const fn into_builder(self) -> NetworkBuilder<(), ()> {
pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
NetworkBuilder { network: self, transactions: (), request_handler: () }
}
@ -369,7 +372,7 @@ impl NetworkManager {
/// Returns a new [`FetchClient`] that can be cloned and shared.
///
/// The [`FetchClient`] is the entrypoint for sending requests to the network.
pub fn fetch_client(&self) -> FetchClient {
pub fn fetch_client(&self) -> FetchClient<N> {
self.swarm.state().fetch_client()
}
@ -416,7 +419,7 @@ impl NetworkManager {
/// Sends an event to the [`EthRequestManager`](crate::eth_requests::EthRequestHandler) if
/// configured.
fn delegate_eth_request(&self, event: IncomingEthRequest) {
fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
if let Some(ref reqs) = self.to_eth_request_handler {
let _ = reqs.try_send(event).map_err(|e| {
if let TrySendError::Full(_) = e {
@ -428,7 +431,7 @@ impl NetworkManager {
}
/// Handle an incoming request from the peer
fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest) {
fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
match req {
PeerRequest::GetBlockHeaders { request, response } => {
self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
@ -469,7 +472,7 @@ impl NetworkManager {
}
/// Invoked after a `NewBlock` message from the peer was validated
fn on_block_import_result(&mut self, outcome: BlockImportOutcome) {
fn on_block_import_result(&mut self, outcome: BlockImportOutcome<N::Block>) {
let BlockImportOutcome { peer, result } = outcome;
match result {
Ok(validated_block) => match validated_block {
@ -511,7 +514,7 @@ impl NetworkManager {
}
/// Handles a received Message from the peer's session.
fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage) {
fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
match msg {
PeerMessage::NewBlockHashes(hashes) => {
self.within_pow_or_disconnect(peer_id, |this| {
@ -551,7 +554,7 @@ impl NetworkManager {
}
/// Handler for received messages from a handle
fn on_handle_message(&mut self, msg: NetworkHandleMessage) {
fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
match msg {
NetworkHandleMessage::DiscoveryListener(tx) => {
self.swarm.state_mut().discovery_mut().add_listener(tx);
@ -646,7 +649,7 @@ impl NetworkManager {
}
}
fn on_swarm_event(&mut self, event: SwarmEvent) {
fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
// handle event
match event {
SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
@ -981,7 +984,7 @@ impl NetworkManager {
}
}
impl Future for NetworkManager {
impl<N: NetworkPrimitives> Future for NetworkManager<N> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@ -11,7 +11,10 @@ use enr::Enr;
use parking_lot::Mutex;
use reth_discv4::{Discv4, NatResolver};
use reth_discv5::Discv5;
use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions};
use reth_eth_wire::{
DisconnectReason, EthNetworkPrimitives, NetworkPrimitives, NewBlock,
NewPooledTransactionHashes, SharedTransactions,
};
use reth_network_api::{
test_utils::{PeersHandle, PeersHandleProvider},
BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent,
@ -39,20 +42,20 @@ use crate::{
///
/// See also [`NetworkManager`](crate::NetworkManager).
#[derive(Clone, Debug)]
pub struct NetworkHandle {
pub struct NetworkHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The Arc'ed delegate that contains the state.
inner: Arc<NetworkInner>,
inner: Arc<NetworkInner<N>>,
}
// === impl NetworkHandle ===
impl NetworkHandle {
impl<N: NetworkPrimitives> NetworkHandle<N> {
/// Creates a single new instance.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
num_active_peers: Arc<AtomicUsize>,
listener_address: Arc<Mutex<SocketAddr>>,
to_manager_tx: UnboundedSender<NetworkHandleMessage>,
to_manager_tx: UnboundedSender<NetworkHandleMessage<N>>,
secret_key: SecretKey,
local_peer_id: PeerId,
peers: PeersHandle,
@ -61,7 +64,7 @@ impl NetworkHandle {
tx_gossip_disabled: bool,
discv4: Option<Discv4>,
discv5: Option<Discv5>,
event_sender: EventSender<NetworkEvent>,
event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
nat: Option<NatResolver>,
) -> Self {
let inner = NetworkInner {
@ -89,7 +92,7 @@ impl NetworkHandle {
&self.inner.local_peer_id
}
fn manager(&self) -> &UnboundedSender<NetworkHandleMessage> {
fn manager(&self) -> &UnboundedSender<NetworkHandleMessage<N>> {
&self.inner.to_manager_tx
}
@ -99,7 +102,7 @@ impl NetworkHandle {
}
/// Sends a [`NetworkHandleMessage`] to the manager
pub(crate) fn send_message(&self, msg: NetworkHandleMessage) {
pub(crate) fn send_message(&self, msg: NetworkHandleMessage<N>) {
let _ = self.inner.to_manager_tx.send(msg);
}
@ -113,12 +116,12 @@ impl NetworkHandle {
/// Caution: in `PoS` this is a noop because new blocks are no longer announced over devp2p.
/// Instead they are sent to the node by CL and can be requested over devp2p.
/// Broadcasting new blocks is considered a protocol violation.
pub fn announce_block(&self, block: NewBlock, hash: B256) {
pub fn announce_block(&self, block: NewBlock<N::Block>, hash: B256) {
self.send_message(NetworkHandleMessage::AnnounceBlock(block, hash))
}
/// Sends a [`PeerRequest`] to the given peer's session.
pub fn send_request(&self, peer_id: PeerId, request: PeerRequest) {
pub fn send_request(&self, peer_id: PeerId, request: PeerRequest<N>) {
self.send_message(NetworkHandleMessage::EthRequest { peer_id, request })
}
@ -186,8 +189,8 @@ impl NetworkHandle {
// === API Implementations ===
impl NetworkEventListenerProvider for NetworkHandle {
fn event_listener(&self) -> EventStream<NetworkEvent> {
impl NetworkEventListenerProvider for NetworkHandle<EthNetworkPrimitives> {
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<EthNetworkPrimitives>>> {
self.inner.event_sender.new_listener()
}
@ -198,13 +201,13 @@ impl NetworkEventListenerProvider for NetworkHandle {
}
}
impl NetworkProtocols for NetworkHandle {
impl<N: NetworkPrimitives> NetworkProtocols for NetworkHandle<N> {
fn add_rlpx_sub_protocol(&self, protocol: RlpxSubProtocol) {
self.send_message(NetworkHandleMessage::AddRlpxSubProtocol(protocol))
}
}
impl PeersInfo for NetworkHandle {
impl<N: NetworkPrimitives> PeersInfo for NetworkHandle<N> {
fn num_connected_peers(&self) -> usize {
self.inner.num_active_peers.load(Ordering::Relaxed)
}
@ -337,13 +340,13 @@ impl Peers for NetworkHandle {
}
}
impl PeersHandleProvider for NetworkHandle {
impl<N: NetworkPrimitives> PeersHandleProvider for NetworkHandle<N> {
fn peers_handle(&self) -> &PeersHandle {
&self.inner.peers
}
}
impl NetworkInfo for NetworkHandle {
impl<N: NetworkPrimitives> NetworkInfo for NetworkHandle<N> {
fn local_addr(&self) -> SocketAddr {
*self.inner.listener_address.lock()
}
@ -367,7 +370,7 @@ impl NetworkInfo for NetworkHandle {
}
}
impl SyncStateProvider for NetworkHandle {
impl<N: NetworkPrimitives> SyncStateProvider for NetworkHandle<N> {
fn is_syncing(&self) -> bool {
self.inner.is_syncing.load(Ordering::Relaxed)
}
@ -380,7 +383,7 @@ impl SyncStateProvider for NetworkHandle {
}
}
impl NetworkSyncUpdater for NetworkHandle {
impl<N: NetworkPrimitives> NetworkSyncUpdater for NetworkHandle<N> {
fn update_sync_state(&self, state: SyncState) {
let future_state = state.is_syncing();
let prev_state = self.inner.is_syncing.swap(future_state, Ordering::Relaxed);
@ -396,8 +399,8 @@ impl NetworkSyncUpdater for NetworkHandle {
}
}
impl BlockDownloaderProvider for NetworkHandle {
type Client = FetchClient;
impl<N: NetworkPrimitives> BlockDownloaderProvider for NetworkHandle<N> {
type Client = FetchClient<N>;
async fn fetch_client(&self) -> Result<Self::Client, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
@ -407,11 +410,11 @@ impl BlockDownloaderProvider for NetworkHandle {
}
#[derive(Debug)]
struct NetworkInner {
struct NetworkInner<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Number of active peer sessions the node's currently handling.
num_active_peers: Arc<AtomicUsize>,
/// Sender half of the message channel to the [`crate::NetworkManager`].
to_manager_tx: UnboundedSender<NetworkHandleMessage>,
to_manager_tx: UnboundedSender<NetworkHandleMessage<N>>,
/// The local address that accepts incoming connections.
listener_address: Arc<Mutex<SocketAddr>>,
/// The secret key used for authenticating sessions.
@ -435,7 +438,7 @@ struct NetworkInner {
/// The instance of the discv5 service
discv5: Option<Discv5>,
/// Sender for high level network events.
event_sender: EventSender<NetworkEvent>,
event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
/// The NAT resolver
nat: Option<NatResolver>,
}
@ -448,7 +451,7 @@ pub trait NetworkProtocols: Send + Sync {
/// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager).
#[derive(Debug)]
pub(crate) enum NetworkHandleMessage {
pub(crate) enum NetworkHandleMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Marks a peer as trusted.
AddTrustedPeerId(PeerId),
/// Adds an address for a peer, including its ID, kind, and socket address.
@ -458,7 +461,7 @@ pub(crate) enum NetworkHandleMessage {
/// Disconnects a connection to a peer if it exists, optionally providing a disconnect reason.
DisconnectPeer(PeerId, Option<DisconnectReason>),
/// Broadcasts an event to announce a new block to all nodes.
AnnounceBlock(NewBlock, B256),
AnnounceBlock(NewBlock<N::Block>, B256),
/// Sends a list of transactions to the given peer.
SendTransaction {
/// The ID of the peer to which the transactions are sent.
@ -478,12 +481,12 @@ pub(crate) enum NetworkHandleMessage {
/// The peer to send the request to.
peer_id: PeerId,
/// The request to send to the peer's sessions.
request: PeerRequest,
request: PeerRequest<N>,
},
/// Applies a reputation change to the given peer.
ReputationChange(PeerId, ReputationChangeKind),
/// Returns the client that can be used to interact with the network.
FetchClient(oneshot::Sender<FetchClient>),
FetchClient(oneshot::Sender<FetchClient<N>>),
/// Applies a status update.
StatusUpdate {
/// The head status to apply.

View File

@ -34,9 +34,10 @@ use std::{
use alloy_primitives::{TxHash, B256};
use futures::{stream::FuturesUnordered, Future, StreamExt};
use reth_eth_wire::{
DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
NewPooledTransactionHashes, NewPooledTransactionHashes66, NewPooledTransactionHashes68,
PooledTransactions, RequestTxHashes, Transactions,
DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
RequestTxHashes, Transactions,
};
use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
use reth_network_api::{
@ -200,15 +201,15 @@ impl TransactionsHandle {
/// propagate new transactions over the network.
#[derive(Debug)]
#[must_use = "Manager does nothing unless polled."]
pub struct TransactionsManager<Pool> {
pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
/// Access to the transaction pool.
pool: Pool,
/// Network access.
network: NetworkHandle,
network: NetworkHandle<N>,
/// Subscriptions to all network related events.
///
/// From which we get all new incoming transaction related messages.
network_events: EventStream<NetworkEvent>,
network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
/// Transaction fetcher to handle inflight and missing transaction requests.
transaction_fetcher: TransactionFetcher,
/// All currently pending transactions grouped by peers.

View File

@ -8,7 +8,7 @@ use alloy_provider::{ext::AdminApi, ProviderBuilder};
use futures::StreamExt;
use reth_chainspec::MAINNET;
use reth_discv4::Discv4Config;
use reth_eth_wire::{DisconnectReason, HeadersDirection};
use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, HeadersDirection};
use reth_net_banlist::BanList;
use reth_network::{
test_utils::{enr_to_peer_id, NetworkEventStream, PeerConfig, Testnet, GETH_TIMEOUT},
@ -204,8 +204,9 @@ async fn test_connect_with_boot_nodes() {
let mut discv4 = Discv4Config::builder();
discv4.add_boot_nodes(mainnet_nodes());
let config =
NetworkConfigBuilder::new(secret_key).discovery(discv4).build(NoopProvider::default());
let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
.discovery(discv4)
.build(NoopProvider::default());
let network = NetworkManager::new(config).await.unwrap();
let handle = network.handle().clone();
@ -572,7 +573,7 @@ async fn test_disconnect_incoming_when_exceeded_incoming_connections() {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let peers_config = PeersConfig::default().with_max_inbound(0);
let config = NetworkConfigBuilder::new(secret_key)
let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
.listener_port(0)
.disable_discovery()
.peer_config(peers_config)

View File

@ -5,6 +5,7 @@ use std::{
use reth_chainspec::MAINNET;
use reth_discv4::{Discv4Config, NatResolver};
use reth_eth_wire::EthNetworkPrimitives;
use reth_network::{
error::{NetworkError, ServiceKind},
Discovery, NetworkConfigBuilder, NetworkManager,
@ -26,7 +27,7 @@ fn is_addr_in_use_kind(err: &NetworkError, kind: ServiceKind) -> bool {
#[tokio::test(flavor = "multi_thread")]
async fn test_is_default_syncing() {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let config = NetworkConfigBuilder::new(secret_key)
let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
.disable_discovery()
.listener_port(0)
.build(NoopProvider::default());
@ -37,13 +38,13 @@ async fn test_is_default_syncing() {
#[tokio::test(flavor = "multi_thread")]
async fn test_listener_addr_in_use() {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let config = NetworkConfigBuilder::new(secret_key)
let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
.disable_discovery()
.listener_port(0)
.build(NoopProvider::default());
let network = NetworkManager::new(config).await.unwrap();
let listener_port = network.local_addr().port();
let config = NetworkConfigBuilder::new(secret_key)
let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
.listener_port(listener_port)
.disable_discovery()
.build(NoopProvider::default());
@ -72,7 +73,7 @@ async fn test_discovery_addr_in_use() {
#[tokio::test(flavor = "multi_thread")]
async fn test_tcp_port_node_record_no_discovery() {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let config = NetworkConfigBuilder::new(secret_key)
let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
.listener_port(0)
.disable_discovery()
.build_with_noop_provider(MAINNET.clone());
@ -90,7 +91,7 @@ async fn test_tcp_port_node_record_no_discovery() {
#[tokio::test(flavor = "multi_thread")]
async fn test_tcp_port_node_record_discovery() {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let config = NetworkConfigBuilder::new(secret_key)
let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
.listener_port(0)
.discovery_port(0)
.disable_dns_discovery()
@ -109,7 +110,7 @@ async fn test_tcp_port_node_record_discovery() {
#[tokio::test(flavor = "multi_thread")]
async fn test_node_record_address_with_nat() {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let config = NetworkConfigBuilder::new(secret_key)
let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
.add_nat(Some(NatResolver::ExternalIp("10.1.1.1".parse().unwrap())))
.disable_discv4_discovery()
.disable_dns_discovery()
@ -125,7 +126,7 @@ async fn test_node_record_address_with_nat() {
#[tokio::test(flavor = "multi_thread")]
async fn test_node_record_address_with_nat_disable_discovery() {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let config = NetworkConfigBuilder::new(secret_key)
let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
.add_nat(Some(NatResolver::ExternalIp("10.1.1.1".parse().unwrap())))
.disable_discovery()
.listener_port(0)

View File

@ -14,7 +14,9 @@
use chainspec::{boot_nodes, bsc_chain_spec};
use reth_discv4::Discv4ConfigBuilder;
use reth_network::{NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager};
use reth_network::{
EthNetworkPrimitives, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager,
};
use reth_network_api::PeersInfo;
use reth_primitives::{ForkHash, ForkId};
use reth_tracing::{
@ -62,7 +64,7 @@ async fn main() {
// latest BSC forkId, we need to override this to allow connections from BSC nodes
let fork_id = ForkId { hash: ForkHash([0x07, 0xb5, 0x43, 0x28]), next: 0 };
net_cfg.fork_filter.set_current_fork_id(fork_id);
let net_manager = NetworkManager::new(net_cfg).await.unwrap();
let net_manager = NetworkManager::<EthNetworkPrimitives>::new(net_cfg).await.unwrap();
// The network handle is our entrypoint into the network.
let net_handle = net_manager.handle().clone();

View File

@ -14,8 +14,8 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use reth::builder::NodeHandle;
use reth_network::{
config::SecretKey, protocol::IntoRlpxSubProtocol, NetworkConfig, NetworkManager,
NetworkProtocols,
config::SecretKey, protocol::IntoRlpxSubProtocol, EthNetworkPrimitives, NetworkConfig,
NetworkManager, NetworkProtocols,
};
use reth_network_api::{test_utils::PeersHandleProvider, NetworkInfo};
use reth_node_ethereum::EthereumNode;
@ -53,7 +53,7 @@ fn main() -> eyre::Result<()> {
.build_with_noop_provider(node.chain_spec());
// spawn the second network instance
let subnetwork = NetworkManager::new(net_cfg).await?;
let subnetwork = NetworkManager::<EthNetworkPrimitives>::new(net_cfg).await?;
let subnetwork_peer_id = *subnetwork.peer_id();
let subnetwork_peer_addr = subnetwork.local_addr();
let subnetwork_handle = subnetwork.peers_handle();

View File

@ -8,7 +8,8 @@
use futures::StreamExt;
use reth_network::{
config::rng_secret_key, NetworkConfig, NetworkEventListenerProvider, NetworkManager,
config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkEventListenerProvider,
NetworkManager,
};
use reth_provider::test_utils::NoopProvider;
@ -24,7 +25,7 @@ async fn main() -> eyre::Result<()> {
let config = NetworkConfig::builder(local_key).mainnet_boot_nodes().build(client);
// create the network instance
let network = NetworkManager::new(config).await?;
let network = NetworkManager::<EthNetworkPrimitives>::new(config).await?;
// get a handle to the network to interact with it
let handle = network.handle().clone();

View File

@ -12,7 +12,8 @@
use chain_cfg::{boot_nodes, head, polygon_chain_spec};
use reth_discv4::Discv4ConfigBuilder;
use reth_network::{
config::NetworkMode, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager,
config::NetworkMode, EthNetworkPrimitives, NetworkConfig, NetworkEvent,
NetworkEventListenerProvider, NetworkManager,
};
use reth_tracing::{
tracing::info, tracing_subscriber::filter::LevelFilter, LayerInfo, LogFormat, RethTracer,
@ -57,7 +58,7 @@ async fn main() {
discv4_cfg.add_boot_nodes(boot_nodes()).lookup_interval(interval);
let net_cfg = net_cfg.set_discovery_v4(discv4_cfg.build());
let net_manager = NetworkManager::new(net_cfg).await.unwrap();
let net_manager = NetworkManager::<EthNetworkPrimitives>::new(net_cfg).await.unwrap();
// The network handle is our entrypoint into the network.
let net_handle = net_manager.handle();