chore(discv5): pub methods (#8057)

This commit is contained in:
Emilia Hane
2024-05-02 22:34:13 +02:00
committed by GitHub
parent 2ac2433a96
commit a590ed7ce5
3 changed files with 182 additions and 185 deletions

View File

@ -143,7 +143,7 @@ impl ConfigBuilder {
}
/// Sets the tcp port to advertise in the local [`Enr`](discv5::enr::Enr).
fn tcp_port(mut self, port: u16) -> Self {
pub fn tcp_port(mut self, port: u16) -> Self {
self.tcp_port = port;
self
}

View File

@ -35,14 +35,12 @@ impl MustIncludeKey {
/// Returns [`FilterOutcome::Ok`] if [`Enr`](discv5::Enr) contains the configured kv-pair key.
pub fn filter(&self, enr: &discv5::Enr) -> FilterOutcome {
if enr.get_raw_rlp(self.key).is_none() {
return FilterOutcome::Ignore { reason: self.ignore_reason() }
return FilterOutcome::Ignore {
reason: format!("{} fork required", String::from_utf8_lossy(self.key)),
}
}
FilterOutcome::Ok
}
fn ignore_reason(&self) -> String {
format!("{} fork required", String::from_utf8_lossy(self.key))
}
}
/// Filter requiring that peers not advertise kv-pairs using certain keys, e.g. b"eth2".
@ -69,20 +67,18 @@ impl MustNotIncludeKeys {
pub fn filter(&self, enr: &discv5::Enr) -> FilterOutcome {
for key in self.keys.iter() {
if matches!(key.filter(enr), FilterOutcome::Ok) {
return FilterOutcome::Ignore { reason: self.ignore_reason() }
return FilterOutcome::Ignore {
reason: format!(
"{} forks not allowed",
self.keys.iter().map(|key| String::from_utf8_lossy(key.key)).format(",")
),
}
}
}
FilterOutcome::Ok
}
fn ignore_reason(&self) -> String {
format!(
"{} forks not allowed",
self.keys.iter().map(|key| String::from_utf8_lossy(key.key)).format(",")
)
}
/// Adds a key that must not be present for any kv-pair in a node record.
pub fn add_disallowed_keys(&mut self, keys: &[&'static [u8]]) {
for key in keys {

View File

@ -161,7 +161,7 @@ impl Discv5 {
//
// 1. make local enr from listen config
//
let (enr, bc_enr, fork_key, ip_mode) = Self::build_local_enr(sk, &discv5_config);
let (enr, bc_enr, fork_key, ip_mode) = build_local_enr(sk, &discv5_config);
trace!(target: "net::discv5",
?enr,
@ -197,14 +197,14 @@ impl Discv5 {
//
// 3. add boot nodes
//
Self::bootstrap(bootstrap_nodes, &discv5).await?;
bootstrap(bootstrap_nodes, &discv5).await?;
let metrics = Discv5Metrics::default();
//
// 4. start bg kbuckets maintenance
//
Self::spawn_populate_kbuckets_bg(
spawn_populate_kbuckets_bg(
lookup_interval,
bootstrap_lookup_interval,
bootstrap_lookup_countdown,
@ -219,169 +219,6 @@ impl Discv5 {
))
}
fn build_local_enr(
sk: &SecretKey,
config: &Config,
) -> (Enr<SecretKey>, NodeRecord, Option<&'static [u8]>, IpMode) {
let mut builder = discv5::enr::Enr::builder();
let Config { discv5_config, fork, tcp_port, other_enr_kv_pairs, .. } = config;
let (ip_mode, socket) = match discv5_config.listen_config {
ListenConfig::Ipv4 { ip, port } => {
if ip != Ipv4Addr::UNSPECIFIED {
builder.ip4(ip);
}
builder.udp4(port);
builder.tcp4(*tcp_port);
(IpMode::Ip4, (ip, port).into())
}
ListenConfig::Ipv6 { ip, port } => {
if ip != Ipv6Addr::UNSPECIFIED {
builder.ip6(ip);
}
builder.udp6(port);
builder.tcp6(*tcp_port);
(IpMode::Ip6, (ip, port).into())
}
ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port } => {
if ipv4 != Ipv4Addr::UNSPECIFIED {
builder.ip4(ipv4);
}
builder.udp4(ipv4_port);
builder.tcp4(*tcp_port);
if ipv6 != Ipv6Addr::UNSPECIFIED {
builder.ip6(ipv6);
}
builder.udp6(ipv6_port);
(IpMode::DualStack, (ipv6, ipv6_port).into())
}
};
// identifies which network node is on
let network_stack_id = fork.as_ref().map(|(network_stack_id, fork_value)| {
builder.add_value_rlp(network_stack_id, alloy_rlp::encode(fork_value).into());
*network_stack_id
});
// add other data
for (key, value) in other_enr_kv_pairs {
builder.add_value_rlp(key, value.clone().into());
}
// enr v4 not to get confused with discv4, independent versioning enr and
// discovery
let enr = builder.build(sk).expect("should build enr v4");
// backwards compatible enr
let bc_enr = NodeRecord::from_secret_key(socket, sk);
(enr, bc_enr, network_stack_id, ip_mode)
}
/// Bootstraps underlying [`discv5::Discv5`] node with configured peers.
async fn bootstrap(
bootstrap_nodes: HashSet<BootNode>,
discv5: &Arc<discv5::Discv5>,
) -> Result<(), Error> {
trace!(target: "net::discv5",
?bootstrap_nodes,
"adding bootstrap nodes .."
);
let mut enr_requests = vec![];
for node in bootstrap_nodes {
match node {
BootNode::Enr(node) => {
if let Err(err) = discv5.add_enr(node) {
return Err(Error::AddNodeFailed(err))
}
}
BootNode::Enode(enode) => {
let discv5 = discv5.clone();
enr_requests.push(async move {
if let Err(err) = discv5.request_enr(enode.to_string()).await {
debug!(target: "net::discv5",
?enode,
%err,
"failed adding boot node"
);
}
})
}
}
}
// If a session is established, the ENR is added straight away to discv5 kbuckets
Ok(_ = join_all(enr_requests).await)
}
/// Backgrounds regular look up queries, in order to keep kbuckets populated.
fn spawn_populate_kbuckets_bg(
lookup_interval: u64,
bootstrap_lookup_interval: u64,
bootstrap_lookup_countdown: u64,
metrics: Discv5Metrics,
discv5: Arc<discv5::Discv5>,
) {
task::spawn({
let local_node_id = discv5.local_enr().node_id();
let lookup_interval = Duration::from_secs(lookup_interval);
let metrics = metrics.discovered_peers;
let mut kbucket_index = MAX_KBUCKET_INDEX;
let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval);
// todo: graceful shutdown
async move {
// make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest
// log2distance from local node
for i in (0..bootstrap_lookup_countdown).rev() {
let target = discv5::enr::NodeId::random();
trace!(target: "net::discv5",
%target,
bootstrap_boost_runs_countdown=i,
lookup_interval=format!("{:#?}", pulse_lookup_interval),
"starting bootstrap boost lookup query"
);
lookup(target, &discv5, &metrics).await;
tokio::time::sleep(pulse_lookup_interval).await;
}
// initiate regular lookups to populate kbuckets
loop {
// make sure node is connected to each subtree in the network by target
// selection (ref kademlia)
let target = get_lookup_target(kbucket_index, local_node_id);
trace!(target: "net::discv5",
%target,
lookup_interval=format!("{:#?}", lookup_interval),
"starting periodic lookup query"
);
lookup(target, &discv5, &metrics).await;
if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX {
// try to populate bucket one step closer
kbucket_index -= 1
} else {
// start over with bucket furthest away
kbucket_index = MAX_KBUCKET_INDEX
}
tokio::time::sleep(lookup_interval).await;
}
}
});
}
/// Process an event from the underlying [`discv5::Discv5`] node.
pub fn on_discv5_update(&mut self, update: discv5::Event) -> Option<DiscoveredPeer> {
match update {
@ -416,7 +253,7 @@ impl Discv5 {
}
/// Processes a discovered peer. Returns `true` if peer is added to
fn on_discovered_peer(
pub fn on_discovered_peer(
&mut self,
enr: &discv5::Enr,
socket: SocketAddr,
@ -467,7 +304,7 @@ impl Discv5 {
///
/// Note: [`discv5::Discv5`] won't initiate a session with any peer with a malformed node
/// record, that advertises a reserved IP address on a WAN network.
fn try_into_reachable(
pub fn try_into_reachable(
&self,
enr: &discv5::Enr,
socket: SocketAddr,
@ -490,13 +327,13 @@ impl Discv5 {
/// Applies filtering rules on an ENR. Returns [`Ok`](FilterOutcome::Ok) if peer should be
/// passed up to app, and [`Ignore`](FilterOutcome::Ignore) if peer should instead be dropped.
fn filter_discovered_peer(&self, enr: &discv5::Enr) -> FilterOutcome {
pub fn filter_discovered_peer(&self, enr: &discv5::Enr) -> FilterOutcome {
self.discovered_peer_filter.filter(enr)
}
/// Returns the [`ForkId`] of the given [`Enr`](discv5::Enr) w.r.t. the local node's network
/// stack, if field is set.
fn get_fork_id<K: discv5::enr::EnrKey>(
pub fn get_fork_id<K: discv5::enr::EnrKey>(
&self,
enr: &discv5::enr::Enr<K>,
) -> Result<ForkId, Error> {
@ -551,6 +388,170 @@ pub struct DiscoveredPeer {
pub fork_id: Option<ForkId>,
}
/// Builds the local ENR with the supplied key.
pub fn build_local_enr(
sk: &SecretKey,
config: &Config,
) -> (Enr<SecretKey>, NodeRecord, Option<&'static [u8]>, IpMode) {
let mut builder = discv5::enr::Enr::builder();
let Config { discv5_config, fork, tcp_port, other_enr_kv_pairs, .. } = config;
let (ip_mode, socket) = match discv5_config.listen_config {
ListenConfig::Ipv4 { ip, port } => {
if ip != Ipv4Addr::UNSPECIFIED {
builder.ip4(ip);
}
builder.udp4(port);
builder.tcp4(*tcp_port);
(IpMode::Ip4, (ip, port).into())
}
ListenConfig::Ipv6 { ip, port } => {
if ip != Ipv6Addr::UNSPECIFIED {
builder.ip6(ip);
}
builder.udp6(port);
builder.tcp6(*tcp_port);
(IpMode::Ip6, (ip, port).into())
}
ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port } => {
if ipv4 != Ipv4Addr::UNSPECIFIED {
builder.ip4(ipv4);
}
builder.udp4(ipv4_port);
builder.tcp4(*tcp_port);
if ipv6 != Ipv6Addr::UNSPECIFIED {
builder.ip6(ipv6);
}
builder.udp6(ipv6_port);
(IpMode::DualStack, (ipv6, ipv6_port).into())
}
};
// identifies which network node is on
let network_stack_id = fork.as_ref().map(|(network_stack_id, fork_value)| {
builder.add_value_rlp(network_stack_id, alloy_rlp::encode(fork_value).into());
*network_stack_id
});
// add other data
for (key, value) in other_enr_kv_pairs {
builder.add_value_rlp(key, value.clone().into());
}
// enr v4 not to get confused with discv4, independent versioning enr and
// discovery
let enr = builder.build(sk).expect("should build enr v4");
// backwards compatible enr
let bc_enr = NodeRecord::from_secret_key(socket, sk);
(enr, bc_enr, network_stack_id, ip_mode)
}
/// Bootstraps underlying [`discv5::Discv5`] node with configured peers.
pub async fn bootstrap(
bootstrap_nodes: HashSet<BootNode>,
discv5: &Arc<discv5::Discv5>,
) -> Result<(), Error> {
trace!(target: "net::discv5",
?bootstrap_nodes,
"adding bootstrap nodes .."
);
let mut enr_requests = vec![];
for node in bootstrap_nodes {
match node {
BootNode::Enr(node) => {
if let Err(err) = discv5.add_enr(node) {
return Err(Error::AddNodeFailed(err))
}
}
BootNode::Enode(enode) => {
let discv5 = discv5.clone();
enr_requests.push(async move {
if let Err(err) = discv5.request_enr(enode.to_string()).await {
debug!(target: "net::discv5",
?enode,
%err,
"failed adding boot node"
);
}
})
}
}
}
// If a session is established, the ENR is added straight away to discv5 kbuckets
Ok(_ = join_all(enr_requests).await)
}
/// Backgrounds regular look up queries, in order to keep kbuckets populated.
pub fn spawn_populate_kbuckets_bg(
lookup_interval: u64,
bootstrap_lookup_interval: u64,
bootstrap_lookup_countdown: u64,
metrics: Discv5Metrics,
discv5: Arc<discv5::Discv5>,
) {
task::spawn({
let local_node_id = discv5.local_enr().node_id();
let lookup_interval = Duration::from_secs(lookup_interval);
let metrics = metrics.discovered_peers;
let mut kbucket_index = MAX_KBUCKET_INDEX;
let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval);
// todo: graceful shutdown
async move {
// make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest
// log2distance from local node
for i in (0..bootstrap_lookup_countdown).rev() {
let target = discv5::enr::NodeId::random();
trace!(target: "net::discv5",
%target,
bootstrap_boost_runs_countdown=i,
lookup_interval=format!("{:#?}", pulse_lookup_interval),
"starting bootstrap boost lookup query"
);
lookup(target, &discv5, &metrics).await;
tokio::time::sleep(pulse_lookup_interval).await;
}
// initiate regular lookups to populate kbuckets
loop {
// make sure node is connected to each subtree in the network by target
// selection (ref kademlia)
let target = get_lookup_target(kbucket_index, local_node_id);
trace!(target: "net::discv5",
%target,
lookup_interval=format!("{:#?}", lookup_interval),
"starting periodic lookup query"
);
lookup(target, &discv5, &metrics).await;
if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX {
// try to populate bucket one step closer
kbucket_index -= 1
} else {
// start over with bucket furthest away
kbucket_index = MAX_KBUCKET_INDEX
}
tokio::time::sleep(lookup_interval).await;
}
}
});
}
/// Gets the next lookup target, based on which bucket is currently being targeted.
pub fn get_lookup_target(
kbucket_index: usize,
@ -846,7 +847,7 @@ mod tests {
let config = Config::builder(TCP_PORT).fork(NetworkStackId::ETH, fork_id).build();
let sk = SecretKey::new(&mut thread_rng());
let (enr, _, _, _) = Discv5::build_local_enr(&sk, &config);
let (enr, _, _, _) = build_local_enr(&sk, &config);
let decoded_fork_id = enr
.get_decodable::<EnrForkIdEntry>(NetworkStackId::ETH)