mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(discv5): plug discv5 crate into network (#7446)
This commit is contained in:
@ -1,7 +1,8 @@
|
||||
//! Interface between node identification on protocol version 5 and 4. Specifically, between types
|
||||
//! [`discv5::enr::NodeId`] and [`PeerId`].
|
||||
|
||||
use discv5::enr::{CombinedPublicKey, Enr, EnrPublicKey, NodeId};
|
||||
use discv5::enr::{CombinedPublicKey, EnrPublicKey, NodeId};
|
||||
use enr::Enr;
|
||||
use reth_primitives::{id2pk, pk2id, PeerId};
|
||||
use secp256k1::{PublicKey, SecretKey};
|
||||
|
||||
|
||||
@ -7,13 +7,13 @@ use discv5::IpMode;
|
||||
pub enum Error {
|
||||
/// Failure adding node to [`discv5::Discv5`].
|
||||
#[error("failed adding node to discv5, {0}")]
|
||||
AddNodeToDiscv5Failed(&'static str),
|
||||
AddNodeFailed(&'static str),
|
||||
/// Node record has incompatible key type.
|
||||
#[error("incompatible key type (not secp256k1)")]
|
||||
IncompatibleKeyType,
|
||||
/// Missing key used to identify rlpx network.
|
||||
#[error("fork missing on enr, 'eth' key missing")]
|
||||
ForkMissing,
|
||||
#[error("fork missing on enr, key missing")]
|
||||
ForkMissing(&'static [u8]),
|
||||
/// Failed to decode [`ForkId`](reth_primitives::ForkId) rlp value.
|
||||
#[error("failed to decode fork id, 'eth': {0:?}")]
|
||||
ForkIdDecodeError(#[from] alloy_rlp::Error),
|
||||
@ -30,9 +30,6 @@ pub enum Error {
|
||||
#[error("init failed, {0}")]
|
||||
InitFailure(&'static str),
|
||||
/// An error from underlying [`discv5::Discv5`] node.
|
||||
#[error("{0}")]
|
||||
#[error("sigp/discv5 error, {0}")]
|
||||
Discv5Error(discv5::Error),
|
||||
/// An error from underlying [`discv5::Discv5`] node.
|
||||
#[error("{0}")]
|
||||
Discv5ErrorStr(&'static str),
|
||||
}
|
||||
|
||||
@ -18,7 +18,6 @@ use std::{
|
||||
|
||||
use ::enr::Enr;
|
||||
use alloy_rlp::Decodable;
|
||||
use derive_more::Deref;
|
||||
use discv5::ListenConfig;
|
||||
use enr::{discv4_id_to_discv5_id, EnrCombinedKeyWrapper};
|
||||
use futures::future::join_all;
|
||||
@ -46,9 +45,8 @@ use metrics::Discv5Metrics;
|
||||
const MAX_LOG2_DISTANCE: usize = 255;
|
||||
|
||||
/// Transparent wrapper around [`discv5::Discv5`].
|
||||
#[derive(Deref, Clone)]
|
||||
#[derive(Clone)]
|
||||
pub struct Discv5 {
|
||||
#[deref]
|
||||
/// sigp/discv5 node.
|
||||
discv5: Arc<discv5::Discv5>,
|
||||
/// [`IpMode`] of the the node.
|
||||
@ -67,9 +65,9 @@ impl Discv5 {
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/// Adds the node to the table, if it is not already present.
|
||||
pub fn add_node_to_routing_table(&self, node_record: Enr<SecretKey>) -> Result<(), Error> {
|
||||
pub fn add_node(&self, node_record: Enr<SecretKey>) -> Result<(), Error> {
|
||||
let EnrCombinedKeyWrapper(enr) = node_record.into();
|
||||
self.add_enr(enr).map_err(Error::AddNodeToDiscv5Failed)
|
||||
self.discv5.add_enr(enr).map_err(Error::AddNodeFailed)
|
||||
}
|
||||
|
||||
/// Sets the pair in the EIP-868 [`Enr`] of the node.
|
||||
@ -85,7 +83,7 @@ impl Discv5 {
|
||||
);
|
||||
return
|
||||
};
|
||||
if let Err(err) = self.enr_insert(key_str, &rlp) {
|
||||
if let Err(err) = self.discv5.enr_insert(key_str, &rlp) {
|
||||
error!(target: "discv5",
|
||||
%err,
|
||||
"failed to update local enr"
|
||||
@ -109,11 +107,11 @@ impl Discv5 {
|
||||
/// Adds the peer and id to the ban list.
|
||||
///
|
||||
/// This will prevent any future inclusion in the table
|
||||
pub fn ban_peer_by_ip_and_node_id(&self, peer_id: PeerId, ip: IpAddr) {
|
||||
pub fn ban(&self, peer_id: PeerId, ip: IpAddr) {
|
||||
match discv4_id_to_discv5_id(peer_id) {
|
||||
Ok(node_id) => {
|
||||
self.ban_node(&node_id, None);
|
||||
self.ban_peer_by_ip(ip);
|
||||
self.discv5.ban_node(&node_id, None);
|
||||
self.ban_ip(ip);
|
||||
}
|
||||
Err(err) => error!(target: "discv5",
|
||||
%err,
|
||||
@ -125,15 +123,15 @@ impl Discv5 {
|
||||
/// Adds the ip to the ban list.
|
||||
///
|
||||
/// This will prevent any future inclusion in the table
|
||||
pub fn ban_peer_by_ip(&self, ip: IpAddr) {
|
||||
self.ban_ip(ip, None);
|
||||
pub fn ban_ip(&self, ip: IpAddr) {
|
||||
self.discv5.ban_ip(ip, None);
|
||||
}
|
||||
|
||||
/// Returns the [`NodeRecord`] of the local node.
|
||||
///
|
||||
/// This includes the currently tracked external IP address of the node.
|
||||
pub fn node_record(&self) -> NodeRecord {
|
||||
let enr: Enr<_> = EnrCombinedKeyWrapper(self.local_enr()).into();
|
||||
let enr: Enr<_> = EnrCombinedKeyWrapper(self.discv5.local_enr()).into();
|
||||
(&enr).try_into().unwrap()
|
||||
}
|
||||
|
||||
@ -238,7 +236,7 @@ impl Discv5 {
|
||||
//
|
||||
// 4. add boot nodes
|
||||
//
|
||||
Self::bootstrap(bootstrap_nodes, &discv5)?;
|
||||
Self::bootstrap(bootstrap_nodes, &discv5).await?;
|
||||
|
||||
let metrics = Discv5Metrics::default();
|
||||
|
||||
@ -255,7 +253,7 @@ impl Discv5 {
|
||||
}
|
||||
|
||||
/// Bootstraps underlying [`discv5::Discv5`] node with configured peers.
|
||||
fn bootstrap(
|
||||
async fn bootstrap(
|
||||
bootstrap_nodes: HashSet<BootNode>,
|
||||
discv5: &Arc<discv5::Discv5>,
|
||||
) -> Result<(), Error> {
|
||||
@ -269,7 +267,7 @@ impl Discv5 {
|
||||
match node {
|
||||
BootNode::Enr(node) => {
|
||||
if let Err(err) = discv5.add_enr(node) {
|
||||
return Err(Error::Discv5ErrorStr(err))
|
||||
return Err(Error::AddNodeFailed(err))
|
||||
}
|
||||
}
|
||||
BootNode::Enode(enode) => {
|
||||
@ -286,18 +284,8 @@ impl Discv5 {
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = join_all(enr_requests);
|
||||
|
||||
debug!(target: "net::discv5",
|
||||
nodes=format!("[{:#}]", discv5.with_kbuckets(|kbuckets| kbuckets
|
||||
.write()
|
||||
.iter()
|
||||
.map(|peer| format!("enr: {:?}, status: {:?}", peer.node.value, peer.status)).collect::<Vec<_>>()
|
||||
).into_iter().format(", ")),
|
||||
"added boot nodes"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
Ok(_ = join_all(enr_requests).await)
|
||||
}
|
||||
|
||||
/// Backgrounds regular look up queries, in order to keep kbuckets populated.
|
||||
@ -479,7 +467,8 @@ impl Discv5 {
|
||||
&self,
|
||||
enr: &discv5::enr::Enr<K>,
|
||||
) -> Result<ForkId, Error> {
|
||||
let mut fork_id_bytes = enr.get_raw_rlp(self.fork_id_key()).ok_or(Error::ForkMissing)?;
|
||||
let key = self.fork_id_key;
|
||||
let mut fork_id_bytes = enr.get_raw_rlp(key).ok_or(Error::ForkMissing(key))?;
|
||||
|
||||
Ok(ForkId::decode(&mut fork_id_bytes)?)
|
||||
}
|
||||
@ -491,9 +480,9 @@ impl Discv5 {
|
||||
/// Exposes API of [`discv5::Discv5`].
|
||||
pub fn with_discv5<F, R>(&self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&Self) -> R,
|
||||
F: FnOnce(&discv5::Discv5) -> R,
|
||||
{
|
||||
f(self)
|
||||
f(&self.discv5)
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
@ -617,7 +606,7 @@ mod tests {
|
||||
// add node_2 to discovery handle of node_1 (should add node to discv5 kbuckets)
|
||||
let node_2_enr_reth_compatible_ty: Enr<SecretKey> =
|
||||
EnrCombinedKeyWrapper(node_2_enr.clone()).into();
|
||||
node_1.add_node_to_routing_table(node_2_enr_reth_compatible_ty).unwrap();
|
||||
node_1.add_node(node_2_enr_reth_compatible_ty).unwrap();
|
||||
|
||||
// verify node_2 is in KBuckets of node_1:discv5
|
||||
assert!(
|
||||
@ -630,21 +619,21 @@ mod tests {
|
||||
// verify node_1:discv5 is connected to node_2:discv5 and vv
|
||||
let event_2_v5 = stream_2.recv().await.unwrap();
|
||||
let event_1_v5 = stream_1.recv().await.unwrap();
|
||||
matches!(
|
||||
assert!(matches!(
|
||||
event_1_v5,
|
||||
discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into()
|
||||
);
|
||||
matches!(
|
||||
));
|
||||
assert!(matches!(
|
||||
event_2_v5,
|
||||
discv5::Event::SessionEstablished(node, socket) if node == node_1_enr && socket == node_1_enr.udp4_socket().unwrap().into()
|
||||
);
|
||||
));
|
||||
|
||||
// verify node_1 is in KBuckets of node_2:discv5
|
||||
let event_2_v5 = stream_2.recv().await.unwrap();
|
||||
matches!(
|
||||
assert!(matches!(
|
||||
event_2_v5,
|
||||
discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none()
|
||||
);
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@ -29,7 +29,7 @@ reth-rpc-types.workspace = true
|
||||
reth-tokio-util.workspace = true
|
||||
|
||||
# ethereum
|
||||
enr = { workspace = true, features = ["rust-secp256k1"], optional = true }
|
||||
enr = { workspace = true, features = ["serde", "rust-secp256k1"] }
|
||||
alloy-rlp.workspace = true
|
||||
discv5.workspace = true
|
||||
|
||||
@ -84,8 +84,6 @@ alloy-node-bindings.workspace = true
|
||||
ethers-core = { workspace = true, default-features = false }
|
||||
ethers-providers = { workspace = true, default-features = false, features = ["ws"] }
|
||||
|
||||
enr = { workspace = true, features = ["serde", "rust-secp256k1"] }
|
||||
|
||||
# misc
|
||||
serial_test.workspace = true
|
||||
tempfile.workspace = true
|
||||
@ -96,10 +94,9 @@ criterion = { workspace = true, features = ["async_tokio", "html_reports"] }
|
||||
|
||||
[features]
|
||||
default = ["serde"]
|
||||
serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr?/serde", "dep:serde_json"]
|
||||
serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr/serde", "dep:serde_json"]
|
||||
test-utils = [
|
||||
"reth-provider/test-utils",
|
||||
"dep:enr",
|
||||
"dep:tempfile",
|
||||
"reth-transaction-pool/test-utils",
|
||||
]
|
||||
|
||||
@ -43,12 +43,12 @@ pub struct NetworkConfig<C> {
|
||||
pub boot_nodes: HashSet<NodeRecord>,
|
||||
/// How to set up discovery over DNS.
|
||||
pub dns_discovery_config: Option<DnsDiscoveryConfig>,
|
||||
/// Address to use for discovery v4.
|
||||
pub discovery_v4_addr: SocketAddr,
|
||||
/// How to set up discovery.
|
||||
pub discovery_v4_config: Option<Discv4Config>,
|
||||
/// How to set up discovery version 5.
|
||||
pub discovery_v5_config: Option<reth_discv5::Config>,
|
||||
/// Address to use for discovery
|
||||
pub discovery_addr: SocketAddr,
|
||||
/// Address to listen for incoming connections
|
||||
pub listener_addr: SocketAddr,
|
||||
/// How to instantiate peer manager.
|
||||
@ -155,10 +155,8 @@ impl<C> NetworkConfig<C> {
|
||||
}
|
||||
|
||||
/// Sets the config to use for the discovery v5 protocol.
|
||||
|
||||
pub fn set_discovery_v5(mut self, discv5_config: reth_discv5::Config) -> Self {
|
||||
self.discovery_v5_config = Some(discv5_config);
|
||||
self.discovery_addr = self.discovery_v5_config.as_ref().unwrap().discovery_socket();
|
||||
self
|
||||
}
|
||||
|
||||
@ -567,7 +565,7 @@ impl NetworkConfigBuilder {
|
||||
dns_discovery_config,
|
||||
discovery_v4_config: discovery_v4_builder.map(|builder| builder.build()),
|
||||
discovery_v5_config: None,
|
||||
discovery_addr: discovery_addr.unwrap_or(DEFAULT_DISCOVERY_ADDRESS),
|
||||
discovery_v4_addr: discovery_addr.unwrap_or(DEFAULT_DISCOVERY_ADDRESS),
|
||||
listener_addr,
|
||||
peers_config: peers_config.unwrap_or_default(),
|
||||
sessions_config: sessions_config.unwrap_or_default(),
|
||||
|
||||
@ -1,18 +1,21 @@
|
||||
//! Discovery support for the network.
|
||||
|
||||
use crate::{
|
||||
cache::LruMap,
|
||||
error::{NetworkError, ServiceKind},
|
||||
manager::DiscoveredEvent,
|
||||
};
|
||||
use enr::Enr;
|
||||
use futures::StreamExt;
|
||||
use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config, EnrForkIdEntry};
|
||||
use reth_discv5::{DiscoveredPeer, Discv5};
|
||||
use reth_dns_discovery::{
|
||||
DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver,
|
||||
};
|
||||
use reth_primitives::{ForkId, NodeRecord, PeerId};
|
||||
use secp256k1::SecretKey;
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap, VecDeque},
|
||||
collections::VecDeque,
|
||||
net::{IpAddr, SocketAddr},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
@ -20,6 +23,12 @@ use std::{
|
||||
};
|
||||
use tokio::{sync::mpsc, task::JoinHandle};
|
||||
use tokio_stream::{wrappers::ReceiverStream, Stream};
|
||||
use tracing::trace;
|
||||
|
||||
/// Default max capacity for cache of discovered peers.
|
||||
///
|
||||
/// Default is 10 000 peers.
|
||||
pub const DEFAULT_MAX_CAPACITY_DISCOVERED_PEERS_CACHE: u32 = 10_000;
|
||||
|
||||
/// An abstraction over the configured discovery protocol.
|
||||
///
|
||||
@ -30,8 +39,8 @@ pub struct Discovery {
|
||||
/// All nodes discovered via discovery protocol.
|
||||
///
|
||||
/// These nodes can be ephemeral and are updated via the discovery protocol.
|
||||
discovered_nodes: HashMap<PeerId, SocketAddr>,
|
||||
/// Local ENR of the discovery service.
|
||||
discovered_nodes: LruMap<PeerId, SocketAddr>,
|
||||
/// Local ENR of the discovery v4 service (discv5 ENR has same [`PeerId`]).
|
||||
local_enr: NodeRecord,
|
||||
/// Handler to interact with the Discovery v4 service
|
||||
discv4: Option<Discv4>,
|
||||
@ -39,6 +48,10 @@ pub struct Discovery {
|
||||
discv4_updates: Option<ReceiverStream<DiscoveryUpdate>>,
|
||||
/// The handle to the spawned discv4 service
|
||||
_discv4_service: Option<JoinHandle<()>>,
|
||||
/// Handler to interact with the Discovery v5 service
|
||||
discv5: Option<Discv5>,
|
||||
/// All KAD table updates from the discv5 service.
|
||||
discv5_updates: Option<ReceiverStream<discv5::Event>>,
|
||||
/// Handler to interact with the DNS discovery service
|
||||
_dns_discovery: Option<DnsDiscoveryHandle>,
|
||||
/// Updates from the DNS discovery service.
|
||||
@ -57,24 +70,42 @@ impl Discovery {
|
||||
/// This will spawn the [`reth_discv4::Discv4Service`] onto a new task and establish a listener
|
||||
/// channel to receive all discovered nodes.
|
||||
pub async fn new(
|
||||
discovery_addr: SocketAddr,
|
||||
discovery_v4_addr: SocketAddr,
|
||||
sk: SecretKey,
|
||||
discv4_config: Option<Discv4Config>,
|
||||
discv5_config: Option<reth_discv5::Config>, // contains discv5 listen address
|
||||
dns_discovery_config: Option<DnsDiscoveryConfig>,
|
||||
) -> Result<Self, NetworkError> {
|
||||
// setup discv4
|
||||
let local_enr = NodeRecord::from_secret_key(discovery_addr, &sk);
|
||||
let (discv4, discv4_updates, _discv4_service) = if let Some(disc_config) = discv4_config {
|
||||
let (discv4, mut discv4_service) =
|
||||
Discv4::bind(discovery_addr, local_enr, sk, disc_config).await.map_err(|err| {
|
||||
NetworkError::from_io_error(err, ServiceKind::Discovery(discovery_addr))
|
||||
})?;
|
||||
let discv4_updates = discv4_service.update_stream();
|
||||
// spawn the service
|
||||
let _discv4_service = discv4_service.spawn();
|
||||
(Some(discv4), Some(discv4_updates), Some(_discv4_service))
|
||||
} else {
|
||||
(None, None, None)
|
||||
let local_enr = NodeRecord::from_secret_key(discovery_v4_addr, &sk);
|
||||
let (discv4, discv4_updates, _discv4_service) = match discv4_config {
|
||||
Some(disc_config) => {
|
||||
let (discv4, mut discv4_service) =
|
||||
Discv4::bind(discovery_v4_addr, local_enr, sk, disc_config).await.map_err(
|
||||
|err| {
|
||||
NetworkError::from_io_error(
|
||||
err,
|
||||
ServiceKind::Discovery(discovery_v4_addr),
|
||||
)
|
||||
},
|
||||
)?;
|
||||
let discv4_updates = discv4_service.update_stream();
|
||||
// spawn the service
|
||||
let _discv4_service = discv4_service.spawn();
|
||||
|
||||
(Some(discv4), Some(discv4_updates), Some(_discv4_service))
|
||||
}
|
||||
None => (None, None, None),
|
||||
};
|
||||
|
||||
let (discv5, discv5_updates) = match discv5_config {
|
||||
Some(config) => {
|
||||
let (discv5, discv5_updates, _local_enr_discv5) =
|
||||
Discv5::start(&sk, config).await?;
|
||||
|
||||
(Some(discv5), Some(discv5_updates.into()))
|
||||
}
|
||||
None => (None, None),
|
||||
};
|
||||
|
||||
// setup DNS discovery
|
||||
@ -97,7 +128,9 @@ impl Discovery {
|
||||
discv4,
|
||||
discv4_updates,
|
||||
_discv4_service,
|
||||
discovered_nodes: Default::default(),
|
||||
discv5,
|
||||
discv5_updates,
|
||||
discovered_nodes: LruMap::new(DEFAULT_MAX_CAPACITY_DISCOVERED_PEERS_CACHE),
|
||||
queued_events: Default::default(),
|
||||
_dns_disc_service,
|
||||
_dns_discovery,
|
||||
@ -122,6 +155,7 @@ impl Discovery {
|
||||
// use forward-compatible forkid entry
|
||||
discv4.set_eip868_rlp("eth".as_bytes().to_vec(), EnrForkIdEntry::from(fork_id))
|
||||
}
|
||||
// todo: update discv5 enr
|
||||
}
|
||||
|
||||
/// Bans the [`IpAddr`] in the discovery service.
|
||||
@ -129,6 +163,9 @@ impl Discovery {
|
||||
if let Some(discv4) = &self.discv4 {
|
||||
discv4.ban_ip(ip)
|
||||
}
|
||||
if let Some(discv5) = &self.discv5 {
|
||||
discv5.ban_ip(ip)
|
||||
}
|
||||
}
|
||||
|
||||
/// Bans the [`PeerId`] and [`IpAddr`] in the discovery service.
|
||||
@ -136,6 +173,9 @@ impl Discovery {
|
||||
if let Some(discv4) = &self.discv4 {
|
||||
discv4.ban(peer_id, ip)
|
||||
}
|
||||
if let Some(discv5) = &self.discv5 {
|
||||
discv5.ban(peer_id, ip)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a shared reference to the discv4.
|
||||
@ -143,9 +183,9 @@ impl Discovery {
|
||||
self.discv4.clone()
|
||||
}
|
||||
|
||||
/// Returns the id with which the local identifies itself in the network
|
||||
/// Returns the id with which the local node identifies itself in the network
|
||||
pub(crate) fn local_id(&self) -> PeerId {
|
||||
self.local_enr.id
|
||||
self.local_enr.id // local discv4 and discv5 have same id, since signed with same secret key
|
||||
}
|
||||
|
||||
/// Add a node to the discv4 table.
|
||||
@ -155,19 +195,27 @@ impl Discovery {
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a node to the discv4 table.
|
||||
pub(crate) fn add_discv5_node(&self, enr: Enr<SecretKey>) -> Result<(), NetworkError> {
|
||||
if let Some(discv5) = &self.discv5 {
|
||||
discv5.add_node(enr).map_err(NetworkError::Discv5Error)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Processes an incoming [NodeRecord] update from a discovery service
|
||||
fn on_node_record_update(&mut self, record: NodeRecord, fork_id: Option<ForkId>) {
|
||||
let id = record.id;
|
||||
let addr = record.tcp_addr();
|
||||
match self.discovered_nodes.entry(id) {
|
||||
Entry::Occupied(_entry) => {}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(addr);
|
||||
_ =
|
||||
self.discovered_nodes.get_or_insert(id, || {
|
||||
self.queued_events.push_back(DiscoveryEvent::NewNode(
|
||||
DiscoveredEvent::EventQueued { peer_id: id, socket_addr: addr, fork_id },
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
addr
|
||||
})
|
||||
}
|
||||
|
||||
fn on_discv4_update(&mut self, update: DiscoveryUpdate) {
|
||||
@ -200,17 +248,37 @@ impl Discovery {
|
||||
return Poll::Ready(event)
|
||||
}
|
||||
|
||||
// drain the update streams
|
||||
// drain the discv4 update stream
|
||||
while let Some(Poll::Ready(Some(update))) =
|
||||
self.discv4_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
|
||||
{
|
||||
self.on_discv4_update(update)
|
||||
}
|
||||
|
||||
// drain the discv5 update stream
|
||||
while let Some(Poll::Ready(Some(update))) =
|
||||
self.discv5_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
|
||||
{
|
||||
if let Some(discv5) = self.discv5.as_mut() {
|
||||
if let Some(DiscoveredPeer { node_record, fork_id }) =
|
||||
discv5.on_discv5_update(update)
|
||||
{
|
||||
self.on_node_record_update(node_record, fork_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// drain the dns update stream
|
||||
while let Some(Poll::Ready(Some(update))) =
|
||||
self.dns_discovery_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
|
||||
{
|
||||
self.add_discv4_node(update.node_record);
|
||||
if let Err(err) = self.add_discv5_node(update.enr) {
|
||||
trace!(target: "net::discovery",
|
||||
%err,
|
||||
"failed adding node discovered by dns to discv5"
|
||||
);
|
||||
}
|
||||
self.on_node_record_update(update.node_record, update.fork_id);
|
||||
}
|
||||
|
||||
@ -239,7 +307,7 @@ impl Discovery {
|
||||
mpsc::unbounded_channel();
|
||||
|
||||
Self {
|
||||
discovered_nodes: Default::default(),
|
||||
discovered_nodes: LruMap::new(0),
|
||||
local_enr: NodeRecord {
|
||||
address: IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
|
||||
tcp_port: 0,
|
||||
@ -248,6 +316,8 @@ impl Discovery {
|
||||
},
|
||||
discv4: Default::default(),
|
||||
discv4_updates: Default::default(),
|
||||
discv5: None,
|
||||
discv5_updates: None,
|
||||
queued_events: Default::default(),
|
||||
_discv4_service: Default::default(),
|
||||
_dns_discovery: None,
|
||||
@ -259,7 +329,7 @@ impl Discovery {
|
||||
}
|
||||
|
||||
/// Events produced by the [`Discovery`] manager.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum DiscoveryEvent {
|
||||
/// Discovered a node
|
||||
NewNode(DiscoveredEvent),
|
||||
@ -279,9 +349,119 @@ mod tests {
|
||||
let mut rng = thread_rng();
|
||||
let (secret_key, _) = SECP256K1.generate_keypair(&mut rng);
|
||||
let discovery_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
|
||||
let _discovery =
|
||||
Discovery::new(discovery_addr, secret_key, Default::default(), Default::default())
|
||||
.await
|
||||
.unwrap();
|
||||
let _discovery = Discovery::new(
|
||||
discovery_addr,
|
||||
secret_key,
|
||||
Default::default(),
|
||||
None,
|
||||
Default::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
use reth_discv4::Discv4ConfigBuilder;
|
||||
use reth_discv5::{enr::EnrCombinedKeyWrapper, enr_to_discv4_id};
|
||||
use tracing::trace;
|
||||
|
||||
async fn start_discovery_node(udp_port_discv4: u16, udp_port_discv5: u16) -> Discovery {
|
||||
let secret_key = SecretKey::new(&mut thread_rng());
|
||||
|
||||
let discv4_addr = format!("127.0.0.1:{udp_port_discv4}").parse().unwrap();
|
||||
let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap();
|
||||
|
||||
// disable `NatResolver`
|
||||
let discv4_config = Discv4ConfigBuilder::default().external_ip_resolver(None).build();
|
||||
|
||||
let discv5_listen_config = discv5::ListenConfig::from(discv5_addr);
|
||||
let discv5_config = reth_discv5::Config::builder(0)
|
||||
.discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
|
||||
.build();
|
||||
|
||||
Discovery::new(discv4_addr, secret_key, Some(discv4_config), Some(discv5_config), None)
|
||||
.await
|
||||
.expect("should build discv5 with discv4 downgrade")
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn discv5_and_discv4_same_pk() {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
// set up test
|
||||
let mut node_1 = start_discovery_node(40014, 40015).await;
|
||||
let discv4_enr_1 = node_1.discv4.as_ref().unwrap().node_record();
|
||||
let discv5_enr_node_1 =
|
||||
node_1.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr());
|
||||
let discv4_id_1 = discv4_enr_1.id;
|
||||
let discv5_id_1 = discv5_enr_node_1.node_id();
|
||||
|
||||
let mut node_2 = start_discovery_node(40024, 40025).await;
|
||||
let discv4_enr_2 = node_2.discv4.as_ref().unwrap().node_record();
|
||||
let discv5_enr_node_2 =
|
||||
node_2.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr());
|
||||
let discv4_id_2 = discv4_enr_2.id;
|
||||
let discv5_id_2 = discv5_enr_node_2.node_id();
|
||||
|
||||
trace!(target: "net::discovery::tests",
|
||||
node_1_node_id=format!("{:#}", discv5_id_1),
|
||||
node_2_node_id=format!("{:#}", discv5_id_2),
|
||||
"started nodes"
|
||||
);
|
||||
|
||||
// test
|
||||
|
||||
// assert discovery version 4 and version 5 nodes have same id
|
||||
assert_eq!(discv4_id_1, enr_to_discv4_id(&discv5_enr_node_1).unwrap());
|
||||
assert_eq!(discv4_id_2, enr_to_discv4_id(&discv5_enr_node_2).unwrap());
|
||||
|
||||
// add node_2:discv4 manually to node_1:discv4
|
||||
node_1.add_discv4_node(discv4_enr_2);
|
||||
|
||||
// verify node_2:discv4 discovered node_1:discv4 and vv
|
||||
let event_node_1 = node_1.next().await.unwrap();
|
||||
let event_node_2 = node_2.next().await.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
|
||||
peer_id: discv4_id_2,
|
||||
socket_addr: discv4_enr_2.tcp_addr(),
|
||||
fork_id: None
|
||||
}),
|
||||
event_node_1
|
||||
);
|
||||
assert_eq!(
|
||||
DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
|
||||
peer_id: discv4_id_1,
|
||||
socket_addr: discv4_enr_1.tcp_addr(),
|
||||
fork_id: None
|
||||
}),
|
||||
event_node_2
|
||||
);
|
||||
|
||||
assert_eq!(1, node_1.discovered_nodes.len());
|
||||
assert_eq!(1, node_2.discovered_nodes.len());
|
||||
|
||||
// add node_2:discv5 to node_1:discv5, manual insertion won't emit an event
|
||||
node_1.add_discv5_node(EnrCombinedKeyWrapper(discv5_enr_node_2.clone()).into()).unwrap();
|
||||
// verify node_2 is in KBuckets of node_1:discv5
|
||||
assert!(node_1
|
||||
.discv5
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.with_discv5(|discv5| discv5.table_entries_id().contains(&discv5_id_2)));
|
||||
|
||||
// manually trigger connection from node_1:discv5 to node_2:discv5
|
||||
node_1
|
||||
.discv5
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.with_discv5(|discv5| discv5.send_ping(discv5_enr_node_2.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// this won't emit an event, since the nodes already discovered each other on discv4, the
|
||||
// number of nodes stored for each node on this level remains 1.
|
||||
assert_eq!(1, node_1.discovered_nodes.len());
|
||||
assert_eq!(1, node_2.discovered_nodes.len());
|
||||
}
|
||||
}
|
||||
|
||||
@ -53,6 +53,9 @@ pub enum NetworkError {
|
||||
/// IO error when creating the discovery service
|
||||
#[error("failed to launch discovery service: {0}")]
|
||||
Discovery(io::Error),
|
||||
/// An error occurred with discovery v5 node.
|
||||
#[error("discv5 error, {0}")]
|
||||
Discv5Error(#[from] reth_discv5::Error),
|
||||
/// Error when setting up the DNS resolver failed
|
||||
///
|
||||
/// See also [DnsResolver](reth_dns_discovery::DnsResolver::from_system_conf)
|
||||
|
||||
@ -177,8 +177,9 @@ where
|
||||
let NetworkConfig {
|
||||
client,
|
||||
secret_key,
|
||||
discovery_v4_addr,
|
||||
mut discovery_v4_config,
|
||||
discovery_addr,
|
||||
discovery_v5_config,
|
||||
listener_addr,
|
||||
peers_config,
|
||||
sessions_config,
|
||||
@ -195,7 +196,7 @@ where
|
||||
tx_gossip_disabled,
|
||||
#[cfg(feature = "optimism")]
|
||||
optimism_network_config: crate::config::OptimismNetworkConfig { sequencer_endpoint },
|
||||
..
|
||||
transactions_manager_config: _,
|
||||
} = config;
|
||||
|
||||
let peers_manager = PeersManager::new(peers_config);
|
||||
@ -213,9 +214,14 @@ where
|
||||
disc_config
|
||||
});
|
||||
|
||||
let discovery =
|
||||
Discovery::new(discovery_addr, secret_key, discovery_v4_config, dns_discovery_config)
|
||||
.await?;
|
||||
let discovery = Discovery::new(
|
||||
discovery_v4_addr,
|
||||
secret_key,
|
||||
discovery_v4_config,
|
||||
discovery_v5_config,
|
||||
dns_discovery_config,
|
||||
)
|
||||
.await?;
|
||||
// need to retrieve the addr here since provided port could be `0`
|
||||
let local_peer_id = discovery.local_id();
|
||||
let discv4 = discovery.discv4();
|
||||
@ -1025,7 +1031,7 @@ pub enum NetworkEvent {
|
||||
PeerRemoved(PeerId),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum DiscoveredEvent {
|
||||
EventQueued { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option<ForkId> },
|
||||
}
|
||||
|
||||
@ -59,8 +59,8 @@ async fn test_discovery_addr_in_use() {
|
||||
let any_port_listener = TcpListener::bind(addr).await.unwrap();
|
||||
let port = any_port_listener.local_addr().unwrap().port();
|
||||
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port));
|
||||
let _discovery = Discovery::new(addr, secret_key, Some(disc_config), None).await.unwrap();
|
||||
let _discovery = Discovery::new(addr, secret_key, Some(disc_config), None, None).await.unwrap();
|
||||
let disc_config = Discv4Config::default();
|
||||
let result = Discovery::new(addr, secret_key, Some(disc_config), None).await;
|
||||
let result = Discovery::new(addr, secret_key, Some(disc_config), None, None).await;
|
||||
assert!(is_addr_in_use_kind(&result.err().unwrap(), ServiceKind::Discovery(addr)));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user