feat: add trusted nodes configuration (#569)

* Add preferred nodes to config

* Add preferred nodes on boot

* Add flag in config for trusted only mode

* Add preferred nodes configuration to config

* Fix comment

* Add preferred nodes to config file

* Rename preferred_nodes to trusted_nodes

* Change preferred to trusted

I renamed preferred_nodes to trusted_nodes in various places.
Seems I missed quite a bit of them.

* Pull trusted_only from config

* Rename DiscoveryConfig to PeersConfig

* Fix last commit

Now actually renamed DiscoveryConfig

* Rename trusted_only to connect_trusted_nodes_only

* Add helper function

* Use HashSet for boot_nodes and trusted_nodes

* Change trusted nodes functions in ConfigBuilder

* Move trusted peers from discv4 to network config

* Add trusted nodes to peers on Manager creation

* Use NodeRecord in trusted_nodes config

* Fix comment

* Move trusted_nodes config to PeersConfig

* Add trusted nodes directly to peers

* Move network_config to Config impl

* Move start_network to NetworkConfig impl
This commit is contained in:
Tomás
2022-12-28 17:48:11 -03:00
committed by GitHub
parent d9d0ba14c4
commit 76e76bb651
5 changed files with 94 additions and 45 deletions

View File

@ -1,4 +1,13 @@
//! Configuration files.
use std::{collections::HashSet, sync::Arc};
use reth_db::database::Database;
use reth_network::{
config::{mainnet_nodes, rng_secret_key},
NetworkConfig,
};
use reth_primitives::{NodeRecord, H256};
use reth_provider::ProviderImpl;
use serde::{Deserialize, Serialize};
/// Configuration for the reth node.
@ -7,6 +16,28 @@ pub struct Config {
/// Configuration for each stage in the pipeline.
// TODO(onbjerg): Can we make this easier to maintain when we add/remove stages?
pub stages: StageConfig,
/// Configuration for the discovery service.
pub peers: PeersConfig,
}
impl Config {
/// Initializes network config from read data
pub fn network_config<DB: Database>(
&self,
db: Arc<DB>,
chain_id: u64,
genesis_hash: H256,
) -> NetworkConfig<ProviderImpl<DB>> {
let peer_config = reth_network::PeersConfig::default()
.with_trusted_nodes(self.peers.trusted_nodes.clone())
.with_connect_trusted_nodes_only(self.peers.connect_trusted_nodes_only);
NetworkConfig::builder(Arc::new(ProviderImpl::new(db)), rng_secret_key())
.boot_nodes(mainnet_nodes())
.peer_config(peer_config)
.genesis_hash(genesis_hash)
.chain_id(chain_id)
.build()
}
}
/// Configuration for each stage in the pipeline.
@ -78,3 +109,12 @@ impl Default for SenderRecoveryConfig {
Self { commit_threshold: 5_000, batch_size: 1000 }
}
}
/// Configuration for peer managing.
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct PeersConfig {
/// Trusted nodes to connect to.
pub trusted_nodes: HashSet<NodeRecord>,
/// Connect to trusted nodes only?
pub connect_trusted_nodes_only: bool,
}

View File

@ -19,13 +19,7 @@ use reth_db::{
use reth_downloaders::{bodies, headers};
use reth_executor::Config as ExecutorConfig;
use reth_interfaces::consensus::ForkchoiceState;
use reth_network::{
config::{mainnet_nodes, rng_secret_key},
error::NetworkError,
NetworkConfig, NetworkHandle, NetworkManager,
};
use reth_primitives::{Account, Header, H256};
use reth_provider::{db_provider::ProviderImpl, BlockProvider, HeaderProvider};
use reth_stages::{
metrics::HeaderMetrics,
stages::{
@ -105,7 +99,8 @@ impl Command {
let genesis_hash = init_genesis(db.clone(), self.chain.genesis.clone())?;
info!("Connecting to p2p");
let network = start_network(network_config(db.clone(), chain_id, genesis_hash)).await?;
let network =
config.network_config(db.clone(), chain_id, genesis_hash).start_network().await?;
// TODO: Are most of these Arcs unnecessary? For example, fetch client is completely
// cloneable on its own
@ -207,31 +202,3 @@ fn init_genesis<DB: Database>(db: Arc<DB>, genesis: Genesis) -> Result<H256, ret
tx.commit()?;
Ok(hash)
}
// TODO: This should be based on some external config
fn network_config<DB: Database>(
db: Arc<DB>,
chain_id: u64,
genesis_hash: H256,
) -> NetworkConfig<ProviderImpl<DB>> {
NetworkConfig::builder(Arc::new(ProviderImpl::new(db)), rng_secret_key())
.boot_nodes(mainnet_nodes())
.genesis_hash(genesis_hash)
.chain_id(chain_id)
.build()
}
/// Starts the networking stack given a [NetworkConfig] and returns a handle to the network.
async fn start_network<C>(config: NetworkConfig<C>) -> Result<NetworkHandle, NetworkError>
where
C: BlockProvider + HeaderProvider + 'static,
{
let client = config.client.clone();
let (handle, network, _txpool, eth) =
NetworkManager::builder(config).await?.request_handler(client).split_with_handle();
tokio::task::spawn(network);
// TODO: tokio::task::spawn(txpool);
tokio::task::spawn(eth);
Ok(handle)
}

View File

@ -542,6 +542,7 @@ impl Discv4Service {
pub fn spawn(mut self) -> JoinHandle<()> {
tokio::task::spawn(async move {
self.bootstrap();
while let Some(event) = self.next().await {
trace!(target : "discv4", ?event, "processed");
}

View File

@ -1,15 +1,19 @@
//! Network config support
use crate::{
error::NetworkError,
import::{BlockImport, ProofOfStakeBlockImport},
peers::PeersConfig,
session::SessionsConfig,
NetworkHandle, NetworkManager,
};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_PORT};
use reth_primitives::{Chain, ForkFilter, Hardfork, NodeRecord, PeerId, H256, MAINNET_GENESIS};
use reth_provider::{BlockProvider, HeaderProvider};
use reth_tasks::TaskExecutor;
use secp256k1::{SecretKey, SECP256K1};
use std::{
collections::HashSet,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
};
@ -36,7 +40,7 @@ pub struct NetworkConfig<C> {
/// The node's secret key, from which the node's identity is derived.
pub secret_key: SecretKey,
/// All boot nodes to start network discovery with.
pub boot_nodes: Vec<NodeRecord>,
pub boot_nodes: HashSet<NodeRecord>,
/// How to set up discovery.
pub discovery_v4_config: Discv4Config,
/// Address to use for discovery
@ -96,6 +100,23 @@ impl<C> NetworkConfig<C> {
}
}
impl<C> NetworkConfig<C>
where
C: BlockProvider + HeaderProvider + 'static,
{
/// Starts the networking stack given a [NetworkConfig] and returns a handle to the network.
pub async fn start_network(self) -> Result<NetworkHandle, NetworkError> {
let client = self.client.clone();
let (handle, network, _txpool, eth) =
NetworkManager::builder(self).await?.request_handler(client).split_with_handle();
tokio::task::spawn(network);
// TODO: tokio::task::spawn(txpool);
tokio::task::spawn(eth);
Ok(handle)
}
}
/// Builder for [`NetworkConfig`](struct.NetworkConfig.html).
#[allow(missing_docs)]
pub struct NetworkConfigBuilder<C> {
@ -106,7 +127,7 @@ pub struct NetworkConfigBuilder<C> {
/// How to set up discovery.
discovery_v4_builder: Discv4ConfigBuilder,
/// All boot nodes to start network discovery with.
boot_nodes: Vec<NodeRecord>,
boot_nodes: HashSet<NodeRecord>,
/// Address to use for discovery
discovery_addr: Option<SocketAddr>,
/// Listener for incoming connections
@ -144,7 +165,7 @@ impl<C> NetworkConfigBuilder<C> {
client,
secret_key,
discovery_v4_builder: Default::default(),
boot_nodes: vec![],
boot_nodes: Default::default(),
discovery_addr: None,
listener_addr: None,
peers_config: None,
@ -253,7 +274,7 @@ impl<C> NetworkConfigBuilder<C> {
self
}
/// Sets the discv4 config to use.
/// Sets the boot nodes.
pub fn boot_nodes(mut self, nodes: impl IntoIterator<Item = NodeRecord>) -> Self {
self.boot_nodes = nodes.into_iter().collect();
self

View File

@ -9,9 +9,9 @@ use crate::{
use futures::StreamExt;
use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
use reth_net_common::ban_list::BanList;
use reth_primitives::{ForkId, PeerId};
use reth_primitives::{ForkId, NodeRecord, PeerId};
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
fmt::Display,
net::{IpAddr, SocketAddr},
task::{Context, Poll},
@ -105,6 +105,8 @@ impl PeersManager {
ban_list,
ban_duration,
backoff_duration,
trusted_nodes,
..
} = config;
let (manager_tx, handle_rx) = mpsc::unbounded_channel();
let now = Instant::now();
@ -112,8 +114,14 @@ impl PeersManager {
// We use half of the interval to decrease the max duration to `150%` in worst case
let unban_interval = ban_duration.min(backoff_duration) / 2;
let mut peers = HashMap::with_capacity(trusted_nodes.len());
for NodeRecord { address, tcp_port, udp_port: _, id } in trusted_nodes {
peers.entry(id).or_insert_with(|| Peer::new(SocketAddr::from((address, tcp_port))));
}
Self {
peers: Default::default(),
peers,
manager_tx,
handle_rx: UnboundedReceiverStream::new(handle_rx),
queued_actions: Default::default(),
@ -794,6 +802,10 @@ pub struct PeersConfig {
/// How long to backoff peers that are we failed to connect to for non-fatal reasons, such as
/// [`DisconnectReason::TooManyPeers`].
pub backoff_duration: Duration,
/// Trusted nodes to connect to.
pub trusted_nodes: HashSet<NodeRecord>,
/// Connect to trusted nodes only?
pub connect_trusted_nodes_only: bool,
}
impl Default for PeersConfig {
@ -807,6 +819,8 @@ impl Default for PeersConfig {
ban_duration: Duration::from_secs(60 * 60 * 12),
// backoff peers for 1h
backoff_duration: Duration::from_secs(60 * 60),
trusted_nodes: Default::default(),
connect_trusted_nodes_only: false,
}
}
}
@ -840,9 +854,15 @@ impl PeersConfig {
self
}
/// How often to recheck free slots for outbound connections
pub fn with_slot_refill_interval(mut self, interval: Duration) -> Self {
self.refill_slots_interval = interval;
/// Nodes to always connect to.
pub fn with_trusted_nodes(mut self, nodes: HashSet<NodeRecord>) -> Self {
self.trusted_nodes = nodes;
self
}
/// Connect only to trusted nodes.
pub fn with_connect_trusted_nodes_only(mut self, trusted_only: bool) -> Self {
self.connect_trusted_nodes_only = trusted_only;
self
}
}