feat(discv5): recycle clean up code (#7727)

Co-authored-by: Oliver Nordbjerg <onbjerg@users.noreply.github.com>
This commit is contained in:
Emilia Hane
2024-04-24 11:53:54 +02:00
committed by GitHub
parent dc6a02ce78
commit 4f81f3acc9
6 changed files with 173 additions and 132 deletions

View File

@ -2,6 +2,7 @@
use std::{
collections::HashSet,
fmt::Debug,
net::{IpAddr, SocketAddr},
};
@ -10,14 +11,7 @@ use discv5::ListenConfig;
use multiaddr::{Multiaddr, Protocol};
use reth_primitives::{Bytes, ForkId, NodeRecord, MAINNET};
use crate::{enr::discv4_id_to_multiaddr_id, filter::MustNotIncludeKeys};
/// L1 EL
pub const ETH: &[u8] = b"eth";
/// L1 CL
pub const ETH2: &[u8] = b"eth2";
/// Optimism
pub const OPSTACK: &[u8] = b"opstack";
use crate::{enr::discv4_id_to_multiaddr_id, filter::MustNotIncludeKeys, network_key};
/// Default interval in seconds at which to run a lookup up query.
///
@ -31,14 +25,18 @@ pub struct ConfigBuilder {
discv5_config: Option<discv5::Config>,
/// Nodes to boot from.
bootstrap_nodes: HashSet<BootNode>,
/// [`ForkId`] to set in local node record.
/// Fork kv-pair to set in local node record. Identifies which network/chain/fork the node
/// belongs, e.g. `(b"opstack", ChainId)` or `(b"eth", ForkId)`.
///
/// Defaults to L1 mainnet if not set.
fork: Option<(&'static [u8], ForkId)>,
/// RLPx TCP port to advertise. Note: so long as `reth_network` handles [`NodeRecord`]s as
/// opposed to [`Enr`](enr::Enr)s, TCP is limited to same IP address as UDP, since
/// [`NodeRecord`] doesn't supply an extra field for and alternative TCP address.
tcp_port: u16,
/// Additional kv-pairs that should be advertised to peers by including in local node record.
other_enr_data: Vec<(&'static str, Bytes)>,
/// List of `(key, rlp-encoded-value)` tuples that should be advertised in local node record
/// (in addition to tcp port, udp port and fork).
other_enr_kv_pairs: Vec<(&'static [u8], Bytes)>,
/// Interval in seconds at which to run a lookup up query to populate kbuckets.
lookup_interval: Option<u64>,
/// Custom filter rules to apply to a discovered peer in order to determine if it should be
@ -52,9 +50,9 @@ impl ConfigBuilder {
let Config {
discv5_config,
bootstrap_nodes,
fork: fork_id,
fork,
tcp_port,
other_enr_data,
other_enr_kv_pairs,
lookup_interval,
discovered_peer_filter,
} = discv5_config;
@ -62,9 +60,9 @@ impl ConfigBuilder {
Self {
discv5_config: Some(discv5_config),
bootstrap_nodes,
fork: Some(fork_id),
fork: Some(fork),
tcp_port,
other_enr_data,
other_enr_kv_pairs,
lookup_interval: Some(lookup_interval),
discovered_peer_filter: Some(discovered_peer_filter),
}
@ -117,9 +115,10 @@ impl ConfigBuilder {
self
}
/// Set [`ForkId`], and key used to identify it, to set in local [`Enr`](discv5::enr::Enr).
pub fn fork(mut self, key: &'static [u8], value: ForkId) -> Self {
self.fork = Some((key, value));
/// Set fork ID kv-pair to set in local [`Enr`](discv5::enr::Enr). This lets peers on discovery
/// network know which chain this node belongs to.
pub fn fork(mut self, network_key: &'static [u8], fork_id: ForkId) -> Self {
self.fork = Some((network_key, fork_id));
self
}
@ -129,9 +128,10 @@ impl ConfigBuilder {
self
}
/// Adds an additional kv-pair to include in the local [`Enr`](discv5::enr::Enr).
pub fn add_enr_kv_pair(mut self, kv_pair: (&'static str, Bytes)) -> Self {
self.other_enr_data.push(kv_pair);
/// Adds an additional kv-pair to include in the local [`Enr`](discv5::enr::Enr). Takes the key
/// to use for the kv-pair and the rlp encoded value.
pub fn add_enr_kv_pair(mut self, key: &'static [u8], value: Bytes) -> Self {
self.other_enr_kv_pairs.push((key, value));
self
}
@ -152,7 +152,7 @@ impl ConfigBuilder {
bootstrap_nodes,
fork,
tcp_port,
other_enr_data,
other_enr_kv_pairs,
lookup_interval,
discovered_peer_filter,
} = self;
@ -160,19 +160,19 @@ impl ConfigBuilder {
let discv5_config = discv5_config
.unwrap_or_else(|| discv5::ConfigBuilder::new(ListenConfig::default()).build());
let fork = fork.unwrap_or((ETH, MAINNET.latest_fork_id()));
let fork = fork.unwrap_or((network_key::ETH, MAINNET.latest_fork_id()));
let lookup_interval = lookup_interval.unwrap_or(DEFAULT_SECONDS_LOOKUP_INTERVAL);
let discovered_peer_filter =
discovered_peer_filter.unwrap_or_else(|| MustNotIncludeKeys::new(&[ETH2]));
discovered_peer_filter.unwrap_or_else(|| MustNotIncludeKeys::new(&[network_key::ETH2]));
Config {
discv5_config,
bootstrap_nodes,
fork,
tcp_port,
other_enr_data,
other_enr_kv_pairs,
lookup_interval,
discovered_peer_filter,
}
@ -187,12 +187,14 @@ pub struct Config {
pub(super) discv5_config: discv5::Config,
/// Nodes to boot from.
pub(super) bootstrap_nodes: HashSet<BootNode>,
/// [`ForkId`] to set in local node record.
/// Fork kv-pair to set in local node record. Identifies which network/chain/fork the node
/// belongs, e.g. `(b"opstack", ChainId)` or `(b"eth", ForkId)`.
pub(super) fork: (&'static [u8], ForkId),
/// RLPx TCP port to advertise.
pub(super) tcp_port: u16,
/// Additional kv-pairs to include in local node record.
pub(super) other_enr_data: Vec<(&'static str, Bytes)>,
/// Additional kv-pairs (besides tcp port, udp port and fork) that should be advertised to
/// peers by including in local node record.
pub(super) other_enr_kv_pairs: Vec<(&'static [u8], Bytes)>,
/// Interval in seconds at which to run a lookup up query with to populate kbuckets.
pub(super) lookup_interval: u64,
/// Custom filter rules to apply to a discovered peer in order to determine if it should be

View File

@ -96,7 +96,7 @@ mod tests {
use alloy_rlp::Bytes;
use discv5::enr::{CombinedKey, Enr};
use crate::config::{ETH, ETH2};
use crate::network_key::{ETH, ETH2};
use super::*;

View File

@ -33,6 +33,7 @@ pub mod enr;
pub mod error;
pub mod filter;
pub mod metrics;
pub mod network_key;
pub use discv5::{self, IpMode};
@ -40,11 +41,13 @@ pub use config::{BootNode, Config, ConfigBuilder};
pub use enr::enr_to_discv4_id;
pub use error::Error;
pub use filter::{FilterOutcome, MustNotIncludeKeys};
use metrics::{DiscoveredPeersMetrics, Discv5Metrics};
/// Default number of times to do pulse lookup queries, at bootstrap (5 second intervals).
/// Default number of times to do pulse lookup queries, at bootstrap (pulse intervals, defaulting
/// to 5 seconds).
///
/// Default is 100 seconds.
/// Default is 100 counts.
pub const DEFAULT_COUNT_PULSE_LOOKUPS_AT_BOOTSTRAP: u64 = 100;
/// Default duration of look up interval, for pulse look ups at bootstrap.
@ -52,7 +55,7 @@ pub const DEFAULT_COUNT_PULSE_LOOKUPS_AT_BOOTSTRAP: u64 = 100;
/// Default is 5 seconds.
pub const DEFAULT_SECONDS_PULSE_LOOKUP_INTERVAL: u64 = 5;
/// Max kbucket index.
/// Max kbucket index is 255.
///
/// This is the max log2distance for 32 byte [`NodeId`](discv5::enr::NodeId) - 1. See <https://github.com/sigp/discv5/blob/e9e0d4f93ec35591832a9a8d937b4161127da87b/src/kbucket.rs#L586-L587>.
pub const MAX_KBUCKET_INDEX: usize = 255;
@ -71,8 +74,8 @@ pub struct Discv5 {
discv5: Arc<discv5::Discv5>,
/// [`IpMode`] of the the node.
ip_mode: IpMode,
/// Key used in kv-pair to ID chain.
fork_id_key: &'static [u8],
/// Key used in kv-pair to ID chain, e.g. 'opstack' or 'eth'.
fork_key: &'static [u8],
/// Filter applied to a discovered peers before passing it up to app.
discovered_peer_filter: MustNotIncludeKeys,
/// Metrics for underlying [`discv5::Discv5`] node and filtered discovered peers.
@ -165,82 +168,21 @@ impl Discv5 {
//
// 1. make local enr from listen config
//
let Config {
discv5_config,
bootstrap_nodes,
fork,
tcp_port,
other_enr_data,
lookup_interval,
discovered_peer_filter,
} = discv5_config;
let (enr, bc_enr, fork_key, ip_mode) = Self::build_local_enr(sk, &discv5_config);
let (enr, bc_enr, ip_mode, fork_id_key) = {
let mut builder = discv5::enr::Enr::builder();
let (ip_mode, socket) = match discv5_config.listen_config {
ListenConfig::Ipv4 { ip, port } => {
if ip != Ipv4Addr::UNSPECIFIED {
builder.ip4(ip);
}
builder.udp4(port);
builder.tcp4(tcp_port);
(IpMode::Ip4, (ip, port).into())
}
ListenConfig::Ipv6 { ip, port } => {
if ip != Ipv6Addr::UNSPECIFIED {
builder.ip6(ip);
}
builder.udp6(port);
builder.tcp6(tcp_port);
(IpMode::Ip6, (ip, port).into())
}
ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port } => {
if ipv4 != Ipv4Addr::UNSPECIFIED {
builder.ip4(ipv4);
}
builder.udp4(ipv4_port);
builder.tcp4(tcp_port);
if ipv6 != Ipv6Addr::UNSPECIFIED {
builder.ip6(ipv6);
}
builder.udp6(ipv6_port);
(IpMode::DualStack, (ipv6, ipv6_port).into())
}
};
// add fork id
let (chain, fork_id) = fork;
builder.add_value_rlp(chain, alloy_rlp::encode(fork_id).into());
// add other data
for (key, value) in other_enr_data {
builder.add_value_rlp(key, alloy_rlp::encode(value).into());
}
// enr v4 not to get confused with discv4, independent versioning enr and
// discovery
let enr = builder.build(sk).expect("should build enr v4");
let EnrCombinedKeyWrapper(enr) = enr.into();
trace!(target: "net::discv5",
?enr,
"local ENR"
);
// backwards compatible enr
let bc_enr = NodeRecord::from_secret_key(socket, sk);
(enr, bc_enr, ip_mode, chain)
};
trace!(target: "net::discv5",
?enr,
"local ENR"
);
//
// 2. start discv5
//
let Config {
discv5_config, bootstrap_nodes, lookup_interval, discovered_peer_filter, ..
} = discv5_config;
let EnrCombinedKeyWrapper(enr) = enr.into();
let sk = discv5::enr::CombinedKey::secp256k1_from_bytes(&mut sk.secret_bytes()).unwrap();
let mut discv5 = match discv5::Discv5::new(enr, sk, discv5_config) {
Ok(discv5) => discv5,
@ -261,17 +203,79 @@ impl Discv5 {
let metrics = Discv5Metrics::default();
//
// 4. bg kbuckets maintenance
// 4. start bg kbuckets maintenance
//
Self::spawn_populate_kbuckets_bg(lookup_interval, metrics.clone(), discv5.clone());
Ok((
Self { discv5, ip_mode, fork_id_key, discovered_peer_filter, metrics },
Self { discv5, ip_mode, fork_key, discovered_peer_filter, metrics },
discv5_updates,
bc_enr,
))
}
fn build_local_enr(
sk: &SecretKey,
config: &Config,
) -> (Enr<SecretKey>, NodeRecord, &'static [u8], IpMode) {
let mut builder = discv5::enr::Enr::builder();
let Config { discv5_config, fork, tcp_port, other_enr_kv_pairs, .. } = config;
let (ip_mode, socket) = match discv5_config.listen_config {
ListenConfig::Ipv4 { ip, port } => {
if ip != Ipv4Addr::UNSPECIFIED {
builder.ip4(ip);
}
builder.udp4(port);
builder.tcp4(*tcp_port);
(IpMode::Ip4, (ip, port).into())
}
ListenConfig::Ipv6 { ip, port } => {
if ip != Ipv6Addr::UNSPECIFIED {
builder.ip6(ip);
}
builder.udp6(port);
builder.tcp6(*tcp_port);
(IpMode::Ip6, (ip, port).into())
}
ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port } => {
if ipv4 != Ipv4Addr::UNSPECIFIED {
builder.ip4(ipv4);
}
builder.udp4(ipv4_port);
builder.tcp4(*tcp_port);
if ipv6 != Ipv6Addr::UNSPECIFIED {
builder.ip6(ipv6);
}
builder.udp6(ipv6_port);
(IpMode::DualStack, (ipv6, ipv6_port).into())
}
};
// identifies which network node is on
let (network, fork_value) = fork;
builder.add_value_rlp(network, alloy_rlp::encode(fork_value).into());
// add other data
for (key, value) in other_enr_kv_pairs {
builder.add_value_rlp(key, value.clone().into());
}
// enr v4 not to get confused with discv4, independent versioning enr and
// discovery
let enr = builder.build(sk).expect("should build enr v4");
// backwards compatible enr
let bc_enr = NodeRecord::from_secret_key(socket, sk);
(enr, bc_enr, network, ip_mode)
}
/// Bootstraps underlying [`discv5::Discv5`] node with configured peers.
async fn bootstrap(
bootstrap_nodes: HashSet<BootNode>,
@ -423,20 +427,20 @@ impl Discv5 {
return None
}
};
let fork_id = match self.filter_discovered_peer(enr) {
FilterOutcome::Ok => self.get_fork_id(enr).ok(),
FilterOutcome::Ignore { reason } => {
trace!(target: "net::discovery::discv5",
?enr,
reason,
"filtered out discovered peer"
);
if let FilterOutcome::Ignore { reason } = self.filter_discovered_peer(enr) {
trace!(target: "net::discovery::discv5",
?enr,
reason,
"filtered out discovered peer"
);
self.metrics.discovered_peers.increment_established_sessions_filtered(1);
self.metrics.discovered_peers.increment_established_sessions_filtered(1);
return None
}
};
return None
}
let fork_id =
(self.fork_key == network_key::ETH).then(|| self.get_fork_id(enr).ok()).flatten();
trace!(target: "net::discovery::discv5",
?fork_id,
@ -485,7 +489,7 @@ impl Discv5 {
&self,
enr: &discv5::enr::Enr<K>,
) -> Result<ForkId, Error> {
let key = self.fork_id_key;
let key = self.fork_key;
let mut fork_id_bytes = enr.get_raw_rlp(key).ok_or(Error::ForkMissing(key))?;
Ok(ForkId::decode(&mut fork_id_bytes)?)
@ -513,8 +517,8 @@ impl Discv5 {
}
/// Returns the key to use to identify the [`ForkId`] kv-pair on the [`Enr`](discv5::Enr).
pub fn fork_id_key(&self) -> &[u8] {
self.fork_id_key
pub fn fork_key(&self) -> &[u8] {
self.fork_key
}
}
@ -603,6 +607,7 @@ pub async fn lookup(
mod tests {
use super::*;
use ::enr::{CombinedKey, EnrKey};
use reth_primitives::MAINNET;
use secp256k1::rand::thread_rng;
use tracing::trace;
@ -618,7 +623,7 @@ mod tests {
.unwrap(),
),
ip_mode: IpMode::Ip4,
fork_id_key: b"noop",
fork_key: b"noop",
discovered_peer_filter: MustNotIncludeKeys::default(),
metrics: Discv5Metrics::default(),
}
@ -818,4 +823,21 @@ mod tests {
assert_eq!(local_node_id.log2_distance(&target), Some(bucket_index as u64 + 1));
}
}
#[test]
fn build_enr_from_config() {
const TCP_PORT: u16 = 30303;
let fork_id = MAINNET.latest_fork_id();
let config = Config::builder(TCP_PORT).fork(network_key::ETH, fork_id).build();
let sk = SecretKey::new(&mut thread_rng());
let (enr, _, _, _) = Discv5::build_local_enr(&sk, &config);
let decoded_fork_id =
ForkId::decode(&mut enr.get_raw_rlp(network_key::ETH).unwrap()).unwrap();
assert_eq!(fork_id, decoded_fork_id);
assert_eq!(TCP_PORT, enr.tcp4().unwrap()); // listen config is defaulting to ip mode ipv4
}
}

View File

@ -2,7 +2,7 @@
use metrics::{Counter, Gauge};
use reth_metrics::Metrics;
use crate::config::{ETH, ETH2, OPSTACK};
use crate::network_key::{ETH, ETH2, OPSTACK};
/// Information tracked by [`Discv5`](crate::Discv5).
#[derive(Debug, Default, Clone)]
@ -91,13 +91,14 @@ impl DiscoveredPeersMetrics {
#[derive(Metrics, Clone)]
#[metrics(scope = "discv5")]
pub struct AdvertisedChainMetrics {
/// Frequency of node records with a kv-pair with [`OPSTACK`] as key.
/// Frequency of node records with a kv-pair with [`OPSTACK`](crate::network_key) as
/// key.
opstack: Counter,
/// Frequency of node records with a kv-pair with [`ETH`] as key.
/// Frequency of node records with a kv-pair with [`ETH`](crate::network_key) as key.
eth: Counter,
/// Frequency of node records with a kv-pair with [`ETH2`] as key.
/// Frequency of node records with a kv-pair with [`ETH2`](crate::network_key) as key.
eth2: Counter,
}

View File

@ -0,0 +1,11 @@
//! Keys of ENR [`ForkId`](reth_primitives::ForkId) kv-pair. Identifies which network a node
//! belongs to.
/// ENR fork ID kv-pair key, for an Ethereum L1 EL node.
pub const ETH: &[u8] = b"eth";
/// ENR fork ID kv-pair key, for an Ethereum L1 CL node.
pub const ETH2: &[u8] = b"eth2";
/// ENR fork ID kv-pair key, for an Optimism CL node.
pub const OPSTACK: &[u8] = b"opstack";

View File

@ -9,11 +9,12 @@ use crate::{
NetworkHandle, NetworkManager,
};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_ADDRESS};
use reth_discv5::config::OPSTACK;
use reth_discv5::network_key;
use reth_dns_discovery::DnsDiscoveryConfig;
use reth_eth_wire::{HelloMessage, HelloMessageWithProtocols, Status};
use reth_primitives::{
mainnet_nodes, pk2id, sepolia_nodes, ChainSpec, ForkFilter, Head, NodeRecord, PeerId, MAINNET,
mainnet_nodes, pk2id, sepolia_nodes, ChainSpec, ForkFilter, Head, NamedChain, NodeRecord,
PeerId, MAINNET,
};
use reth_provider::{BlockReader, HeaderProvider};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
@ -121,15 +122,19 @@ impl<C> NetworkConfig<C> {
) -> Self {
let rlpx_port = self.listener_addr.port();
let chain = self.chain_spec.chain;
let fork_id = self.status.forkid;
let fork_id = self.chain_spec.latest_fork_id();
let boot_nodes = self.boot_nodes.clone();
let mut builder =
reth_discv5::Config::builder(rlpx_port).add_unsigned_boot_nodes(boot_nodes.into_iter());
if chain.is_optimism() {
builder = builder.fork(OPSTACK, fork_id)
if chain.named() == Some(NamedChain::Mainnet) {
builder = builder.fork(network_key::ETH, fork_id)
}
// todo: set op EL fork id
/*if chain.is_optimism() {
builder = builder.fork(network_key::, fork_id)
}*/
self.set_discovery_v5(f(builder))
}