replaced generic to box dyn (#9443)

This commit is contained in:
Luca Provini
2024-07-12 16:15:23 +02:00
committed by GitHub
parent b4363e2b48
commit a617bd0f3b
10 changed files with 94 additions and 79 deletions

View File

@ -14,28 +14,28 @@ pub(crate) const ETH_REQUEST_CHANNEL_CAPACITY: usize = 256;
/// A builder that can configure all components of the network.
#[allow(missing_debug_implementations)]
pub struct NetworkBuilder<C, Tx, Eth> {
pub(crate) network: NetworkManager<C>,
pub struct NetworkBuilder<Tx, Eth> {
pub(crate) network: NetworkManager,
pub(crate) transactions: Tx,
pub(crate) request_handler: Eth,
}
// === impl NetworkBuilder ===
impl<C, Tx, Eth> NetworkBuilder<C, Tx, Eth> {
impl<Tx, Eth> NetworkBuilder<Tx, Eth> {
/// Consumes the type and returns all fields.
pub fn split(self) -> (NetworkManager<C>, Tx, Eth) {
pub fn split(self) -> (NetworkManager, Tx, Eth) {
let Self { network, transactions, request_handler } = self;
(network, transactions, request_handler)
}
/// Returns the network manager.
pub const fn network(&self) -> &NetworkManager<C> {
pub const fn network(&self) -> &NetworkManager {
&self.network
}
/// Returns the mutable network manager.
pub fn network_mut(&mut self) -> &mut NetworkManager<C> {
pub fn network_mut(&mut self) -> &mut NetworkManager {
&mut self.network
}
@ -45,7 +45,7 @@ impl<C, Tx, Eth> NetworkBuilder<C, Tx, Eth> {
}
/// Consumes the type and returns all fields and also return a [`NetworkHandle`].
pub fn split_with_handle(self) -> (NetworkHandle, NetworkManager<C>, Tx, Eth) {
pub fn split_with_handle(self) -> (NetworkHandle, NetworkManager, Tx, Eth) {
let Self { network, transactions, request_handler } = self;
let handle = network.handle().clone();
(handle, network, transactions, request_handler)
@ -56,7 +56,7 @@ impl<C, Tx, Eth> NetworkBuilder<C, Tx, Eth> {
self,
pool: Pool,
transactions_manager_config: TransactionsManagerConfig,
) -> NetworkBuilder<C, TransactionsManager<Pool>, Eth> {
) -> NetworkBuilder<TransactionsManager<Pool>, Eth> {
let Self { mut network, request_handler, .. } = self;
let (tx, rx) = mpsc::unbounded_channel();
network.set_transactions(tx);
@ -69,7 +69,7 @@ impl<C, Tx, Eth> NetworkBuilder<C, Tx, Eth> {
pub fn request_handler<Client>(
self,
client: Client,
) -> NetworkBuilder<C, Tx, EthRequestHandler<Client>> {
) -> NetworkBuilder<Tx, EthRequestHandler<Client>> {
let Self { mut network, transactions, .. } = self;
let (tx, rx) = mpsc::channel(ETH_REQUEST_CHANNEL_CAPACITY);
network.set_eth_request_handler(tx);

View File

@ -121,10 +121,10 @@ impl<C> NetworkConfig<C> {
impl<C> NetworkConfig<C>
where
C: BlockNumReader,
C: BlockNumReader + 'static,
{
/// Convenience method for calling [`NetworkManager::new`].
pub async fn manager(self) -> Result<NetworkManager<C>, NetworkError> {
pub async fn manager(self) -> Result<NetworkManager, NetworkError> {
NetworkManager::new(self).await
}
}
@ -136,8 +136,10 @@ where
/// Starts the networking stack given a [`NetworkConfig`] and returns a handle to the network.
pub async fn start_network(self) -> Result<NetworkHandle, NetworkError> {
let client = self.client.clone();
let (handle, network, _txpool, eth) =
NetworkManager::builder(self).await?.request_handler(client).split_with_handle();
let (handle, network, _txpool, eth) = NetworkManager::builder::<C>(self)
.await?
.request_handler::<C>(client)
.split_with_handle();
tokio::task::spawn(network);
// TODO: tokio::task::spawn(txpool);

View File

@ -148,6 +148,7 @@ pub use session::{
PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, SessionEvent, SessionId,
SessionManager,
};
pub use transactions::{FilterAnnouncement, MessageFilter, ValidateTx68};
pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols};

View File

@ -75,9 +75,9 @@ use tracing::{debug, error, trace, warn};
/// include_mmd!("docs/mermaid/network-manager.mmd")
#[derive(Debug)]
#[must_use = "The NetworkManager does nothing unless polled"]
pub struct NetworkManager<C> {
pub struct NetworkManager {
/// The type that manages the actual network part, which includes connections.
swarm: Swarm<C>,
swarm: Swarm,
/// Underlying network handle that can be shared.
handle: NetworkHandle,
/// Receiver half of the command channel set up between this type and the [`NetworkHandle`]
@ -115,7 +115,7 @@ pub struct NetworkManager<C> {
}
// === impl NetworkManager ===
impl<C> NetworkManager<C> {
impl NetworkManager {
/// Sets the dedicated channel for events indented for the
/// [`TransactionsManager`](crate::transactions::TransactionsManager).
pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent>) {
@ -160,15 +160,14 @@ impl<C> NetworkManager<C> {
}
}
impl<C> NetworkManager<C>
where
C: BlockNumReader,
{
impl NetworkManager {
/// Creates the manager of a new network.
///
/// The [`NetworkManager`] is an endless future that needs to be polled in order to advance the
/// state of the entire network.
pub async fn new(config: NetworkConfig<C>) -> Result<Self, NetworkError> {
pub async fn new<C: BlockNumReader + 'static>(
config: NetworkConfig<C>,
) -> Result<Self, NetworkError> {
let NetworkConfig {
client,
secret_key,
@ -241,8 +240,12 @@ where
extra_protocols,
);
let state =
NetworkState::new(client, discovery, peers_manager, Arc::clone(&num_active_peers));
let state = NetworkState::new(
crate::state::BlockNumReader::new(client),
discovery,
peers_manager,
Arc::clone(&num_active_peers),
);
let swarm = Swarm::new(incoming, sessions, state);
@ -306,15 +309,15 @@ where
/// .split_with_handle();
/// }
/// ```
pub async fn builder(
pub async fn builder<C: BlockNumReader + 'static>(
config: NetworkConfig<C>,
) -> Result<NetworkBuilder<C, (), ()>, NetworkError> {
) -> Result<NetworkBuilder<(), ()>, NetworkError> {
let network = Self::new(config).await?;
Ok(network.into_builder())
}
/// Create a [`NetworkBuilder`] to configure all components of the network
pub const fn into_builder(self) -> NetworkBuilder<C, (), ()> {
pub const fn into_builder(self) -> NetworkBuilder<(), ()> {
NetworkBuilder { network: self, transactions: (), request_handler: () }
}
@ -940,10 +943,7 @@ where
}
}
impl<C> NetworkManager<C>
where
C: BlockNumReader + Unpin,
{
impl NetworkManager {
/// Drives the [`NetworkManager`] future until a [`GracefulShutdown`] signal is received.
///
/// This invokes the given function `shutdown_hook` while holding the graceful shutdown guard.
@ -969,10 +969,7 @@ where
}
}
impl<C> Future for NetworkManager<C>
where
C: BlockNumReader + Unpin,
{
impl Future for NetworkManager {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@ -20,10 +20,11 @@ use reth_eth_wire::{
use reth_network_api::PeerKind;
use reth_network_peers::PeerId;
use reth_primitives::{ForkId, B256};
use reth_storage_api::BlockNumReader;
use std::{
collections::{HashMap, VecDeque},
fmt,
net::{IpAddr, SocketAddr},
ops::Deref,
sync::{
atomic::{AtomicU64, AtomicUsize},
Arc,
@ -36,6 +37,30 @@ use tracing::{debug, trace};
/// Cache limit of blocks to keep track of for a single peer.
const PEER_BLOCK_CACHE_LIMIT: u32 = 512;
/// Wrapper type for the [`BlockNumReader`] trait.
pub(crate) struct BlockNumReader(Box<dyn reth_storage_api::BlockNumReader>);
impl BlockNumReader {
/// Create a new instance with the given reader.
pub fn new(reader: impl reth_storage_api::BlockNumReader + 'static) -> Self {
Self(Box::new(reader))
}
}
impl fmt::Debug for BlockNumReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BlockNumReader").field("inner", &"<dyn BlockNumReader>").finish()
}
}
impl Deref for BlockNumReader {
type Target = Box<dyn reth_storage_api::BlockNumReader>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// The [`NetworkState`] keeps track of the state of all peers in the network.
///
/// This includes:
@ -47,7 +72,7 @@ const PEER_BLOCK_CACHE_LIMIT: u32 = 512;
///
/// This type is also responsible for responding for received request.
#[derive(Debug)]
pub struct NetworkState<C> {
pub struct NetworkState {
/// All active peers and their state.
active_peers: HashMap<PeerId, ActivePeer>,
/// Manages connections to peers.
@ -58,7 +83,7 @@ pub struct NetworkState<C> {
///
/// This type is used to fetch the block number after we established a session and received the
/// [Status] block hash.
client: C,
client: BlockNumReader,
/// Network discovery.
discovery: Discovery,
/// The type that handles requests.
@ -69,13 +94,10 @@ pub struct NetworkState<C> {
state_fetcher: StateFetcher,
}
impl<C> NetworkState<C>
where
C: BlockNumReader,
{
impl NetworkState {
/// Create a new state instance with the given params
pub(crate) fn new(
client: C,
client: BlockNumReader,
discovery: Discovery,
peers_manager: PeersManager,
num_active_peers: Arc<AtomicUsize>,
@ -523,8 +545,12 @@ pub(crate) enum StateAction {
#[cfg(test)]
mod tests {
use crate::{
discovery::Discovery, fetch::StateFetcher, message::PeerRequestSender, peers::PeersManager,
state::NetworkState, PeerRequest,
discovery::Discovery,
fetch::StateFetcher,
message::PeerRequestSender,
peers::PeersManager,
state::{BlockNumReader, NetworkState},
PeerRequest,
};
use reth_eth_wire::{
capability::{Capabilities, Capability},
@ -542,14 +568,14 @@ mod tests {
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
/// Returns a testing instance of the [`NetworkState`].
fn state() -> NetworkState<NoopProvider> {
fn state() -> NetworkState {
let peers = PeersManager::default();
let handle = peers.handle();
NetworkState {
active_peers: Default::default(),
peers_manager: Default::default(),
queued_messages: Default::default(),
client: NoopProvider::default(),
client: BlockNumReader(Box::new(NoopProvider::default())),
discovery: Discovery::noop(),
state_fetcher: StateFetcher::new(handle, Default::default()),
}

View File

@ -13,7 +13,6 @@ use reth_eth_wire::{
EthVersion, Status,
};
use reth_network_peers::PeerId;
use reth_storage_api::BlockNumReader;
use std::{
io,
net::SocketAddr,
@ -50,23 +49,23 @@ use tracing::trace;
/// `include_mmd!("docs/mermaid/swarm.mmd`")
#[derive(Debug)]
#[must_use = "Swarm does nothing unless polled"]
pub(crate) struct Swarm<C> {
pub(crate) struct Swarm {
/// Listens for new incoming connections.
incoming: ConnectionListener,
/// All sessions.
sessions: SessionManager,
/// Tracks the entire state of the network and handles events received from the sessions.
state: NetworkState<C>,
state: NetworkState,
}
// === impl Swarm ===
impl<C> Swarm<C> {
impl Swarm {
/// Configures a new swarm instance.
pub(crate) const fn new(
incoming: ConnectionListener,
sessions: SessionManager,
state: NetworkState<C>,
state: NetworkState,
) -> Self {
Self { incoming, sessions, state }
}
@ -77,12 +76,12 @@ impl<C> Swarm<C> {
}
/// Access to the state.
pub(crate) const fn state(&self) -> &NetworkState<C> {
pub(crate) const fn state(&self) -> &NetworkState {
&self.state
}
/// Mutable access to the state.
pub(crate) fn state_mut(&mut self) -> &mut NetworkState<C> {
pub(crate) fn state_mut(&mut self) -> &mut NetworkState {
&mut self.state
}
@ -102,10 +101,7 @@ impl<C> Swarm<C> {
}
}
impl<C> Swarm<C>
where
C: BlockNumReader,
{
impl Swarm {
/// Triggers a new outgoing connection to the given node
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
self.sessions.dial_outbound(remote_addr, remote_id)
@ -285,10 +281,7 @@ where
}
}
impl<C> Stream for Swarm<C>
where
C: BlockNumReader + Unpin,
{
impl Stream for Swarm {
type Item = SwarmEvent;
/// This advances all components.

View File

@ -51,7 +51,7 @@ pub struct Testnet<C, Pool> {
impl<C> Testnet<C, TestPool>
where
C: BlockReader + HeaderProvider + Clone,
C: BlockReader + HeaderProvider + Clone + 'static,
{
/// Same as [`Self::try_create_with`] but panics on error
pub async fn create_with(num_peers: usize, provider: C) -> Self {
@ -85,7 +85,7 @@ where
impl<C, Pool> Testnet<C, Pool>
where
C: BlockReader + HeaderProvider + Clone,
C: BlockReader + HeaderProvider + Clone + 'static,
Pool: TransactionPool,
{
/// Return a mutable slice of all peers.
@ -252,7 +252,7 @@ impl<C, Pool> fmt::Debug for Testnet<C, Pool> {
impl<C, Pool> Future for Testnet<C, Pool>
where
C: BlockReader + HeaderProvider + Unpin,
C: BlockReader + HeaderProvider + Unpin + 'static,
Pool: TransactionPool + Unpin + 'static,
{
type Output = ();
@ -326,7 +326,7 @@ impl<C, Pool> TestnetHandle<C, Pool> {
#[derive(Debug)]
pub struct Peer<C, Pool = TestPool> {
#[pin]
network: NetworkManager<C>,
network: NetworkManager,
#[pin]
request_handler: Option<EthRequestHandler<C>>,
#[pin]
@ -340,7 +340,7 @@ pub struct Peer<C, Pool = TestPool> {
impl<C, Pool> Peer<C, Pool>
where
C: BlockReader + HeaderProvider + Clone,
C: BlockReader + HeaderProvider + Clone + 'static,
Pool: TransactionPool,
{
/// Returns the number of connected peers.
@ -373,7 +373,7 @@ where
}
/// Returns mutable access to the network.
pub fn network_mut(&mut self) -> &mut NetworkManager<C> {
pub fn network_mut(&mut self) -> &mut NetworkManager {
&mut self.network
}
@ -437,7 +437,7 @@ where
impl<C> Peer<C>
where
C: BlockReader + HeaderProvider + Clone,
C: BlockReader + HeaderProvider + Clone + 'static,
{
/// Installs a new [`TestPool`]
pub fn install_test_pool(&mut self) {
@ -447,7 +447,7 @@ where
impl<C, Pool> Future for Peer<C, Pool>
where
C: BlockReader + HeaderProvider + Unpin,
C: BlockReader + HeaderProvider + Unpin + 'static,
Pool: TransactionPool + Unpin + 'static,
{
type Output = ();
@ -526,7 +526,7 @@ impl<Pool> PeerHandle<Pool> {
impl<C> PeerConfig<C>
where
C: BlockReader + HeaderProvider + Clone,
C: BlockReader + HeaderProvider + Clone + 'static,
{
/// Launches the network and returns the [Peer] that manages it
pub async fn launch(self) -> Result<Peer<C>, NetworkError> {

View File

@ -674,7 +674,7 @@ async fn test_rejected_by_already_connect() {
async fn new_random_peer(
max_in_bound: usize,
trusted_nodes: HashSet<NodeRecord>,
) -> NetworkManager<NoopProvider> {
) -> NetworkManager {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let peers_config =
PeersConfig::default().with_max_inbound(max_in_bound).with_trusted_nodes(trusted_nodes);
@ -683,7 +683,7 @@ async fn new_random_peer(
.listener_port(0)
.disable_discovery()
.peer_config(peers_config)
.build(NoopProvider::default());
.build_with_noop_provider();
NetworkManager::new(config).await.unwrap()
}

View File

@ -478,7 +478,7 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
}
/// Creates the [`NetworkBuilder`] for the node.
pub async fn network_builder(&self) -> eyre::Result<NetworkBuilder<Node::Provider, (), ()>> {
pub async fn network_builder(&self) -> eyre::Result<NetworkBuilder<(), ()>> {
let network_config = self.network_config()?;
let builder = NetworkManager::builder(network_config).await?;
Ok(builder)
@ -488,11 +488,7 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
///
/// Spawns the configured network and associated tasks and returns the [`NetworkHandle`]
/// connected to that network.
pub fn start_network<Pool>(
&self,
builder: NetworkBuilder<Node::Provider, (), ()>,
pool: Pool,
) -> NetworkHandle
pub fn start_network<Pool>(&self, builder: NetworkBuilder<(), ()>, pool: Pool) -> NetworkHandle
where
Pool: TransactionPool + Unpin + 'static,
{

View File

@ -49,7 +49,7 @@ pub trait RethNetworkConfig {
// TODO add more network config methods here
}
impl<C> RethNetworkConfig for reth_network::NetworkManager<C> {
impl RethNetworkConfig for reth_network::NetworkManager {
fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
Self::add_rlpx_sub_protocol(self, protocol);
}