feat(cli): add --disable-discovery (#597)

* redo add disable_discovery cli flag

* incorporate option into discovery test

* smol touch up

* rustmft

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
GeemoCandama
2022-12-30 03:56:59 -06:00
committed by GitHub
parent b5d8b6d4dd
commit f5ae970e63
5 changed files with 66 additions and 35 deletions

View File

@ -27,6 +27,7 @@ impl Config {
db: Arc<DB>,
chain_id: u64,
genesis_hash: H256,
disable_discovery: bool,
) -> NetworkConfig<ProviderImpl<DB>> {
let peer_config = reth_network::PeersConfig::default()
.with_trusted_nodes(self.peers.trusted_nodes.clone())
@ -36,6 +37,7 @@ impl Config {
.peer_config(peer_config)
.genesis_hash(genesis_hash)
.chain_id(chain_id)
.set_discovery(disable_discovery)
.build()
}
}

View File

@ -75,6 +75,10 @@ pub struct Command {
/// NOTE: This is a temporary flag
#[arg(long = "debug.tip")]
tip: Option<H256>,
/// Disable the discovery service.
#[arg(short, long)]
disable_discovery: bool,
}
impl Command {
@ -98,8 +102,10 @@ impl Command {
let consensus = Arc::new(BeaconConsensus::new(self.chain.consensus.clone()));
let genesis_hash = init_genesis(db.clone(), self.chain.genesis.clone())?;
let network =
config.network_config(db.clone(), chain_id, genesis_hash).start_network().await?;
let network = config
.network_config(db.clone(), chain_id, genesis_hash, self.disable_discovery)
.start_network()
.await?;
info!(peer_id = ?network.peer_id(), local_addr = %network.local_addr(), "Started p2p networking");

View File

@ -42,7 +42,7 @@ pub struct NetworkConfig<C> {
/// All boot nodes to start network discovery with.
pub boot_nodes: HashSet<NodeRecord>,
/// How to set up discovery.
pub discovery_v4_config: Discv4Config,
pub discovery_v4_config: Option<Discv4Config>,
/// Address to use for discovery
pub discovery_addr: SocketAddr,
/// Address to listen for incoming connections
@ -89,7 +89,7 @@ impl<C> NetworkConfig<C> {
/// Sets the config to use for the discovery v4 protocol.
pub fn set_discovery_v4(mut self, discovery_config: Discv4Config) -> Self {
self.discovery_v4_config = discovery_config;
self.discovery_v4_config = Some(discovery_config);
self
}
@ -125,7 +125,7 @@ pub struct NetworkConfigBuilder<C> {
/// The node's secret key, from which the node's identity is derived.
secret_key: SecretKey,
/// How to set up discovery.
discovery_v4_builder: Discv4ConfigBuilder,
discovery_v4_builder: Option<Discv4ConfigBuilder>,
/// All boot nodes to start network discovery with.
boot_nodes: HashSet<NodeRecord>,
/// Address to use for discovery
@ -164,7 +164,7 @@ impl<C> NetworkConfigBuilder<C> {
Self {
client,
secret_key,
discovery_v4_builder: Default::default(),
discovery_v4_builder: Some(Default::default()),
boot_nodes: Default::default(),
discovery_addr: None,
listener_addr: None,
@ -270,7 +270,7 @@ impl<C> NetworkConfigBuilder<C> {
/// Sets the discv4 config to use.
pub fn discovery(mut self, builder: Discv4ConfigBuilder) -> Self {
self.discovery_v4_builder = builder;
self.discovery_v4_builder = Some(builder);
self
}
@ -280,6 +280,19 @@ impl<C> NetworkConfigBuilder<C> {
self
}
/// Sets the discovery service off on true.
pub fn set_discovery(mut self, disable_discovery: bool) -> Self {
if disable_discovery {
self.disable_discovery();
}
self
}
/// disables discovery.
pub fn disable_discovery(&mut self) {
self.discovery_v4_builder = None;
}
/// Consumes the type and creates the actual [`NetworkConfig`]
pub fn build(self) -> NetworkConfig<C> {
let peer_id = self.get_peer_id();
@ -322,7 +335,7 @@ impl<C> NetworkConfigBuilder<C> {
client,
secret_key,
boot_nodes,
discovery_v4_config: discovery_v4_builder.build(),
discovery_v4_config: discovery_v4_builder.map(|builder| builder.build()),
discovery_addr: discovery_addr.unwrap_or_else(|| {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT))
}),

View File

@ -24,15 +24,13 @@ pub struct Discovery {
/// Local ENR of the discovery service.
local_enr: NodeRecord,
/// Handler to interact with the Discovery v4 service
discv4: Discv4,
discv4: Option<Discv4>,
/// All KAD table updates from the discv4 service.
discv4_updates: ReceiverStream<DiscoveryUpdate>,
/// The initial config for the discv4 service
_dsicv4_config: Discv4Config,
discv4_updates: Option<ReceiverStream<DiscoveryUpdate>>,
/// Events buffered until polled.
queued_events: VecDeque<DiscoveryEvent>,
/// The handle to the spawned discv4 service
_discv4_service: JoinHandle<()>,
_discv4_service: Option<JoinHandle<()>>,
}
impl Discovery {
@ -43,23 +41,26 @@ impl Discovery {
pub async fn new(
discovery_addr: SocketAddr,
sk: SecretKey,
dsicv4_config: Discv4Config,
discv4_config: Option<Discv4Config>,
) -> Result<Self, NetworkError> {
let local_enr = NodeRecord::from_secret_key(discovery_addr, &sk);
let (discv4, discv4_updates, _discv4_service) = if let Some(disc_config) = discv4_config {
let (discv4, mut discv4_service) =
Discv4::bind(discovery_addr, local_enr, sk, dsicv4_config.clone())
Discv4::bind(discovery_addr, local_enr, sk, disc_config)
.await
.map_err(NetworkError::Discovery)?;
let discv4_updates = discv4_service.update_stream();
// spawn the service
let _discv4_service = discv4_service.spawn();
(Some(discv4), Some(discv4_updates), Some(_discv4_service))
} else {
(None, None, None)
};
Ok(Self {
local_enr,
discv4,
discv4_updates,
_dsicv4_config: dsicv4_config,
_discv4_service,
discovered_nodes: Default::default(),
queued_events: Default::default(),
@ -69,17 +70,23 @@ impl Discovery {
/// Updates the `eth:ForkId` field in discv4.
#[allow(unused)]
pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
self.discv4.set_eip868_rlp("eth".as_bytes().to_vec(), fork_id)
if let Some(discv4) = &self.discv4 {
discv4.set_eip868_rlp("eth".as_bytes().to_vec(), fork_id)
}
}
/// Bans the [`IpAddr`] in the discovery service.
pub(crate) fn ban_ip(&self, ip: IpAddr) {
self.discv4.ban_ip(ip)
if let Some(discv4) = &self.discv4 {
discv4.ban_ip(ip)
}
}
/// Bans the [`PeerId`] and [`IpAddr`] in the discovery service.
pub(crate) fn ban(&self, peer_id: PeerId, ip: IpAddr) {
self.discv4.ban(peer_id, ip)
if let Some(discv4) = &self.discv4 {
discv4.ban(peer_id, ip)
}
}
/// Returns the id with which the local identifies itself in the network
@ -122,7 +129,9 @@ impl Discovery {
}
// drain the update stream
while let Poll::Ready(Some(update)) = self.discv4_updates.poll_next_unpin(cx) {
while let Some(Poll::Ready(Some(update))) =
self.discv4_updates.as_mut().map(|disc_updates| disc_updates.poll_next_unpin(cx))
{
self.on_discv4_update(update)
}
@ -139,7 +148,6 @@ impl Discovery {
///
/// NOTE: This instance does nothing
pub(crate) fn noop() -> Self {
let (_tx, rx) = tokio::sync::mpsc::channel(1);
Self {
discovered_nodes: Default::default(),
local_enr: NodeRecord {
@ -148,11 +156,10 @@ impl Discovery {
udp_port: 0,
id: PeerId::random(),
},
discv4: Discv4::noop(),
discv4_updates: ReceiverStream::new(rx),
_dsicv4_config: Default::default(),
discv4: Default::default(),
discv4_updates: Default::default(),
queued_events: Default::default(),
_discv4_service: tokio::task::spawn(async move {}),
_discv4_service: Default::default(),
}
}
}

View File

@ -160,9 +160,12 @@ where
let incoming = ConnectionListener::bind(listener_addr).await?;
let listener_address = Arc::new(Mutex::new(incoming.local_address()));
discovery_v4_config = discovery_v4_config.map(|mut disc_config| {
// merge configured boot nodes
discovery_v4_config.bootstrap_nodes.extend(boot_nodes.clone());
discovery_v4_config.add_eip868_pair("eth", status.forkid);
disc_config.bootstrap_nodes.extend(boot_nodes.clone());
disc_config.add_eip868_pair("eth", status.forkid);
disc_config
});
let discovery = Discovery::new(discovery_addr, secret_key, discovery_v4_config).await?;
// need to retrieve the addr here since provided port could be `0`