From 7767b216bc7de1ce9cd496cebc3abe8a8f5b98a1 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 13 Jan 2023 10:34:13 +0100 Subject: [PATCH] feat: integrate DNS discovery service (#832) --- Cargo.lock | 2 + crates/net/dns/Cargo.toml | 1 + crates/net/dns/src/config.rs | 6 +- crates/net/dns/src/lib.rs | 87 ++++++++++++++++++++++------- crates/net/dns/src/resolver.rs | 6 +- crates/net/network/Cargo.toml | 1 + crates/net/network/src/config.rs | 50 +++++++++++++++++ crates/net/network/src/discovery.rs | 75 ++++++++++++++++++++----- crates/net/network/src/error.rs | 10 +++- crates/net/network/src/manager.rs | 5 +- 10 files changed, 200 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 49c5a4306..0fd3dfb27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3749,6 +3749,7 @@ dependencies = [ "parking_lot 0.12.1", "reth-net-common", "reth-primitives", + "reth-rlp", "reth-tracing", "secp256k1 0.24.2", "thiserror", @@ -3989,6 +3990,7 @@ dependencies = [ "pin-project", "rand 0.8.5", "reth-discv4", + "reth-dns-discovery", "reth-ecies", "reth-eth-wire", "reth-interfaces", diff --git a/crates/net/dns/Cargo.toml b/crates/net/dns/Cargo.toml index 1ee4f3116..fe3e60805 100644 --- a/crates/net/dns/Cargo.toml +++ b/crates/net/dns/Cargo.toml @@ -11,6 +11,7 @@ description = "Support for EIP-1459 Node Discovery via DNS" # reth reth-primitives = { path = "../../primitives" } reth-net-common = { path = "../common" } +reth-rlp = { path = "../../common/rlp" } # ethereum secp256k1 = { version = "0.24", features = [ diff --git a/crates/net/dns/src/config.rs b/crates/net/dns/src/config.rs index be324ced5..7d39a6856 100644 --- a/crates/net/dns/src/config.rs +++ b/crates/net/dns/src/config.rs @@ -1,4 +1,5 @@ -use std::{num::NonZeroUsize, time::Duration}; +use crate::tree::LinkEntry; +use std::{collections::HashSet, num::NonZeroUsize, time::Duration}; /// Settings for the [DnsDiscoveryClient](crate::DnsDiscoveryClient). #[derive(Debug, Clone)] @@ -17,6 +18,8 @@ pub struct DnsDiscoveryConfig { pub recheck_interval: Duration, /// Maximum number of cached DNS records. pub dns_record_cache_limit: NonZeroUsize, + /// Links to the DNS networks to bootstrap. + pub bootstrap_dns_networks: Option>, } impl Default for DnsDiscoveryConfig { @@ -26,6 +29,7 @@ impl Default for DnsDiscoveryConfig { max_requests_per_sec: NonZeroUsize::new(3).unwrap(), recheck_interval: Duration::from_secs(60 * 30), dns_record_cache_limit: NonZeroUsize::new(1_000).unwrap(), + bootstrap_dns_networks: Some(Default::default()), } } } diff --git a/crates/net/dns/src/lib.rs b/crates/net/dns/src/lib.rs index 0d749582b..eb4b45f7b 100644 --- a/crates/net/dns/src/lib.rs +++ b/crates/net/dns/src/lib.rs @@ -17,10 +17,10 @@ pub use config::DnsDiscoveryConfig; use enr::Enr; use error::ParseDnsEntryError; use lru::LruCache; -use reth_primitives::{NodeRecord, PeerId}; +use reth_primitives::{ForkId, NodeRecord, PeerId}; use secp256k1::SecretKey; use std::{ - collections::{hash_map::Entry, HashMap, VecDeque}, + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, net::IpAddr, pin::Pin, sync::Arc, @@ -28,16 +28,19 @@ use std::{ time::{Duration, Instant}, }; use sync::SyncTree; -use tokio::sync::{ - mpsc, - mpsc::{error::TrySendError, UnboundedSender}, - oneshot, +use tokio::{ + sync::{ + mpsc, + mpsc::{error::TrySendError, UnboundedSender}, + oneshot, + }, + task::JoinHandle, }; use tokio_stream::{ wrappers::{ReceiverStream, UnboundedReceiverStream}, - Stream, + Stream, StreamExt, }; -use tracing::{debug, warn}; +use tracing::{debug, trace, warn}; mod config; mod error; @@ -70,7 +73,7 @@ impl DnsDiscoveryHandle { /// Returns the receiver half of new listener channel that streams discovered [`NodeRecord`]s. pub async fn node_record_stream( &self, - ) -> Result, oneshot::error::RecvError> { + ) -> Result, oneshot::error::RecvError> { let (tx, rx) = oneshot::channel(); let cmd = DnsDiscoveryCommand::NodeRecordUpdates(tx); let _ = self.to_service.send(cmd); @@ -86,7 +89,7 @@ pub struct DnsDiscoveryService { /// Receiver half of the command channel. command_rx: UnboundedReceiverStream, /// All subscribers for resolved [NodeRecord]s. - node_record_listeners: Vec>, + node_record_listeners: Vec>, /// All the trees that can be synced. trees: HashMap, /// All queries currently in progress @@ -97,6 +100,8 @@ pub struct DnsDiscoveryService { queued_events: VecDeque, /// The rate at which trees should be updated. recheck_interval: Duration, + /// Links to the DNS networks to bootstrap. + bootstrap_dns_networks: HashSet, } // === impl DnsDiscoveryService === @@ -118,6 +123,7 @@ impl DnsDiscoveryService { max_requests_per_sec, recheck_interval, dns_record_cache_limit, + bootstrap_dns_networks, } = config; let queries = QueryPool::new(resolver, max_requests_per_sec, lookup_timeout); let (command_tx, command_rx) = mpsc::unbounded_channel(); @@ -130,6 +136,27 @@ impl DnsDiscoveryService { dns_record_cache: LruCache::new(dns_record_cache_limit), queued_events: Default::default(), recheck_interval, + bootstrap_dns_networks: bootstrap_dns_networks.unwrap_or_default(), + } + } + + /// Spawns this services onto a new task + /// + /// Note: requires a running runtime + pub fn spawn(mut self) -> JoinHandle<()> { + tokio::task::spawn(async move { + self.bootstrap(); + + while let Some(event) = self.next().await { + trace!(target : "disc::dns", ?event, "processed"); + } + }) + } + + /// Starts discovery with all configured bootstrap links + pub fn bootstrap(&mut self) { + for link in self.bootstrap_dns_networks.clone() { + self.sync_tree_with_link(link); } } @@ -147,7 +174,7 @@ impl DnsDiscoveryService { } /// Creates a new channel for [`NodeRecord`]s. - pub fn node_record_stream(&mut self) -> ReceiverStream { + pub fn node_record_stream(&mut self) -> ReceiverStream { let (tx, rx) = mpsc::channel(256); self.node_record_listeners.push(tx); ReceiverStream::new(rx) @@ -156,8 +183,8 @@ impl DnsDiscoveryService { /// Sends the event to all listeners. /// /// Remove channels that got closed. - fn notify(&mut self, record: NodeRecord) { - self.node_record_listeners.retain_mut(|listener| match listener.try_send(record) { + fn notify(&mut self, record: DnsNodeRecordUpdate) { + self.node_record_listeners.retain_mut(|listener| match listener.try_send(record.clone()) { Ok(()) => true, Err(err) => match err { TrySendError::Full(_) => true, @@ -329,11 +356,20 @@ impl Stream for DnsDiscoveryService { } } +/// The converted discovered [Enr] object +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct DnsNodeRecordUpdate { + /// Discovered node and it's addresses + pub node_record: NodeRecord, + /// The forkid of the node, if present in the ENR + pub fork_id: Option, +} + /// Commands sent from [DnsDiscoveryHandle] to [DnsDiscoveryService] enum DnsDiscoveryCommand { /// Sync a tree SyncTree(LinkEntry), - NodeRecordUpdates(oneshot::Sender>), + NodeRecordUpdates(oneshot::Sender>), } /// Represents dns discovery related update events. @@ -344,15 +380,21 @@ pub enum DnsDiscoveryEvent { } /// Converts an [Enr] into a [NodeRecord] -fn convert_enr_node_record(enr: &Enr) -> Option { - let record = NodeRecord { +fn convert_enr_node_record(enr: &Enr) -> Option { + use reth_rlp::Decodable; + + let node_record = NodeRecord { address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?, tcp_port: enr.tcp4().or_else(|| enr.tcp6())?, udp_port: enr.udp4().or_else(|| enr.udp6())?, id: PeerId::from_slice(&enr.public_key().serialize_uncompressed()[1..]), } .into_ipv4_mapped(); - Some(record) + + let mut maybe_fork_id = enr.get(b"eth")?; + let fork_id = ForkId::decode(&mut maybe_fork_id).ok(); + + Some(DnsNodeRecordUpdate { node_record, fork_id }) } #[cfg(test)] @@ -360,7 +402,8 @@ mod tests { use super::*; use crate::tree::TreeRootEntry; use enr::{EnrBuilder, EnrKey}; - use reth_primitives::Chain; + use reth_primitives::{Chain, Hardfork}; + use reth_rlp::Encodable; use secp256k1::rand::thread_rng; use std::{future::poll_fn, net::Ipv4Addr}; use tokio_stream::StreamExt; @@ -408,7 +451,10 @@ mod tests { resolver.insert(link.domain.clone(), root.to_string()); let mut builder = EnrBuilder::new("v4"); - builder.ip4(Ipv4Addr::LOCALHOST).udp4(30303).tcp4(30303); + let mut buf = Vec::new(); + let fork_id = Hardfork::Frontier.fork_id(); + fork_id.encode(&mut buf); + builder.ip4(Ipv4Addr::LOCALHOST).udp4(30303).tcp4(30303).add_value(b"eth", &buf); let enr = builder.build(&secret_key).unwrap(); resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64()); @@ -418,7 +464,8 @@ mod tests { let mut node_records = service.node_record_stream(); let task = tokio::task::spawn(async move { - let _ = node_records.next().await.unwrap(); + let record = node_records.next().await.unwrap(); + assert_eq!(record.fork_id, Some(fork_id)); }); service.sync_tree_with_link(link.clone()); diff --git a/crates/net/dns/src/resolver.rs b/crates/net/dns/src/resolver.rs index edc6a3e7f..007fdacea 100644 --- a/crates/net/dns/src/resolver.rs +++ b/crates/net/dns/src/resolver.rs @@ -4,10 +4,8 @@ use async_trait::async_trait; use parking_lot::RwLock; use std::collections::HashMap; use tracing::trace; -pub use trust_dns_resolver::TokioAsyncResolver; -use trust_dns_resolver::{ - error::ResolveError, proto::DnsHandle, AsyncResolver, ConnectionProvider, -}; +pub use trust_dns_resolver::{error::ResolveError, TokioAsyncResolver}; +use trust_dns_resolver::{proto::DnsHandle, AsyncResolver, ConnectionProvider}; /// A type that can lookup DNS entries #[async_trait] diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 2612776fb..5ab77dbbf 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -22,6 +22,7 @@ reth-primitives = { path = "../../primitives" } reth-net-common = { path = "../common" } reth-network-api = { path = "../network-api" } reth-discv4 = { path = "../discv4" } +reth-dns-discovery = { path = "../dns" } reth-eth-wire = { path = "../eth-wire" } reth-ecies = { path = "../ecies" } reth-rlp = { path = "../../common/rlp" } diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index 4bbb12042..406c403f9 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -25,6 +25,7 @@ mod __reexport { pub use secp256k1::SecretKey; } pub use __reexport::*; +use reth_dns_discovery::DnsDiscoveryConfig; use reth_ecies::util::pk2id; use reth_eth_wire::{HelloMessage, Status}; @@ -41,6 +42,8 @@ pub struct NetworkConfig { pub secret_key: SecretKey, /// All boot nodes to start network discovery with. pub boot_nodes: HashSet, + /// How to set up discovery over DNS. + pub dns_discovery_config: Option, /// How to set up discovery. pub discovery_v4_config: Option, /// Address to use for discovery @@ -124,6 +127,8 @@ pub struct NetworkConfigBuilder { client: Arc, /// The node's secret key, from which the node's identity is derived. secret_key: SecretKey, + /// How to configure discovery over DNS. + dns_discovery_config: Option, /// How to set up discovery. discovery_v4_builder: Option, /// All boot nodes to start network discovery with. @@ -164,6 +169,7 @@ impl NetworkConfigBuilder { Self { client, secret_key, + dns_discovery_config: Some(Default::default()), discovery_v4_builder: Some(Default::default()), boot_nodes: Default::default(), discovery_addr: None, @@ -274,6 +280,12 @@ impl NetworkConfigBuilder { self } + /// Sets the dns discovery config to use. + pub fn dns_discovery(mut self, config: DnsDiscoveryConfig) -> Self { + self.dns_discovery_config = Some(config); + self + } + /// Sets the boot nodes. pub fn boot_nodes(mut self, nodes: impl IntoIterator) -> Self { self.boot_nodes = nodes.into_iter().collect(); @@ -299,6 +311,7 @@ impl NetworkConfigBuilder { let Self { client, secret_key, + mut dns_discovery_config, discovery_v4_builder, boot_nodes, discovery_addr, @@ -331,10 +344,22 @@ impl NetworkConfigBuilder { ForkFilter::new(head, genesis_hash, Hardfork::all_forks()) }); + // If default DNS config is used then we add the known dns network to bootstrap from + if let Some(dns_networks) = + dns_discovery_config.as_mut().and_then(|c| c.bootstrap_dns_networks.as_mut()) + { + if dns_networks.is_empty() { + if let Some(link) = chain.public_dns_network_protocol() { + dns_networks.insert(link.parse().expect("is valid DNS link entry")); + } + } + } + NetworkConfig { client, secret_key, boot_nodes, + dns_discovery_config, discovery_v4_config: discovery_v4_builder.map(|builder| builder.build()), discovery_addr: discovery_addr.unwrap_or_else(|| { SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT)) @@ -376,3 +401,28 @@ impl NetworkMode { matches!(self, NetworkMode::Stake) } } + +#[cfg(test)] +mod tests { + use super::*; + use rand::thread_rng; + use reth_dns_discovery::tree::LinkEntry; + use reth_provider::test_utils::NoopProvider; + + fn builder() -> NetworkConfigBuilder { + let secret_key = SecretKey::new(&mut thread_rng()); + NetworkConfig::builder(Arc::new(NoopProvider::default()), secret_key) + } + + #[test] + fn test_network_dns_defaults() { + let config = builder().build(); + + let dns = config.dns_discovery_config.unwrap(); + let bootstrap_nodes = dns.bootstrap_dns_networks.unwrap(); + let mainnet_dns: LinkEntry = + Chain::mainnet().public_dns_network_protocol().unwrap().parse().unwrap(); + assert!(bootstrap_nodes.contains(&mainnet_dns)); + assert_eq!(bootstrap_nodes.len(), 1); + } +} diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index 68c799909..07e3248b3 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -3,11 +3,15 @@ use crate::error::NetworkError; use futures::StreamExt; use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config}; +use reth_dns_discovery::{ + DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver, +}; use reth_primitives::{ForkId, NodeRecord, PeerId}; use secp256k1::SecretKey; use std::{ collections::{hash_map::Entry, HashMap, VecDeque}, net::{IpAddr, SocketAddr}, + sync::Arc, task::{Context, Poll}, }; use tokio::task::JoinHandle; @@ -27,10 +31,16 @@ pub struct Discovery { discv4: Option, /// All KAD table updates from the discv4 service. discv4_updates: Option>, - /// Events buffered until polled. - queued_events: VecDeque, /// The handle to the spawned discv4 service _discv4_service: Option>, + /// Handler to interact with the DNS discovery service + _dns_discovery: Option, + /// Updates from the DNS discovery service. + dns_discovery_updates: Option>, + /// The handle to the spawned DNS discovery service + _dns_disc_service: Option>, + /// Events buffered until polled. + queued_events: VecDeque, } impl Discovery { @@ -42,7 +52,9 @@ impl Discovery { discovery_addr: SocketAddr, sk: SecretKey, discv4_config: Option, + dns_discovery_config: Option, ) -> Result { + // setup discv4 let local_enr = NodeRecord::from_secret_key(discovery_addr, &sk); let (discv4, discv4_updates, _discv4_service) = if let Some(disc_config) = discv4_config { let (discv4, mut discv4_service) = @@ -57,6 +69,20 @@ impl Discovery { (None, None, None) }; + // setup DNS discovery + let (_dns_discovery, dns_discovery_updates, _dns_disc_service) = + if let Some(dns_config) = dns_discovery_config { + let (mut service, dns_disc) = DnsDiscoveryService::new_pair( + Arc::new(DnsResolver::from_system_conf()?), + dns_config, + ); + let dns_discovery_updates = service.node_record_stream(); + let dns_disc_service = service.spawn(); + (Some(dns_disc), Some(dns_discovery_updates), Some(dns_disc_service)) + } else { + (None, None, None) + }; + Ok(Self { local_enr, discv4, @@ -64,6 +90,9 @@ impl Discovery { _discv4_service, discovered_nodes: Default::default(), queued_events: Default::default(), + _dns_disc_service, + _dns_discovery, + dns_discovery_updates, }) } @@ -94,18 +123,23 @@ impl Discovery { self.local_enr.id } + /// Processes an incoming [NodeRecord] update from a discovery service + fn on_node_record_update(&mut self, record: NodeRecord, _fork_id: Option) { + let id = record.id; + let addr = record.tcp_addr(); + match self.discovered_nodes.entry(id) { + Entry::Occupied(_entry) => {} + Entry::Vacant(entry) => { + entry.insert(addr); + self.queued_events.push_back(DiscoveryEvent::Discovered(id, addr)) + } + } + } + fn on_discv4_update(&mut self, update: DiscoveryUpdate) { match update { - DiscoveryUpdate::Added(node) => { - let id = node.id; - let addr = node.tcp_addr(); - match self.discovered_nodes.entry(id) { - Entry::Occupied(_entry) => {} - Entry::Vacant(entry) => { - entry.insert(addr); - self.queued_events.push_back(DiscoveryEvent::Discovered(id, addr)) - } - } + DiscoveryUpdate::Added(record) => { + self.on_node_record_update(record, None); } DiscoveryUpdate::EnrForkId(node, fork_id) => { self.queued_events.push_back(DiscoveryEvent::EnrForkId(node.id, fork_id)) @@ -128,13 +162,19 @@ impl Discovery { return Poll::Ready(event) } - // drain the update stream + // drain the update streams while let Some(Poll::Ready(Some(update))) = - self.discv4_updates.as_mut().map(|disc_updates| disc_updates.poll_next_unpin(cx)) + self.discv4_updates.as_mut().map(|updates| updates.poll_next_unpin(cx)) { self.on_discv4_update(update) } + while let Some(Poll::Ready(Some(update))) = + self.dns_discovery_updates.as_mut().map(|updates| updates.poll_next_unpin(cx)) + { + self.on_node_record_update(update.node_record, update.fork_id); + } + if self.queued_events.is_empty() { return Poll::Pending } @@ -160,6 +200,9 @@ impl Discovery { discv4_updates: Default::default(), queued_events: Default::default(), _discv4_service: Default::default(), + _dns_discovery: None, + dns_discovery_updates: None, + _dns_disc_service: None, } } } @@ -185,6 +228,8 @@ mod tests { let (secret_key, _) = SECP256K1.generate_keypair(&mut rng); let discovery_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); let _discovery = - Discovery::new(discovery_addr, secret_key, Default::default()).await.unwrap(); + Discovery::new(discovery_addr, secret_key, Default::default(), Default::default()) + .await + .unwrap(); } } diff --git a/crates/net/network/src/error.rs b/crates/net/network/src/error.rs index 4fc7f3a88..db260b8f2 100644 --- a/crates/net/network/src/error.rs +++ b/crates/net/network/src/error.rs @@ -1,6 +1,7 @@ //! Possible errors when interacting with the network. use crate::session::PendingSessionHandshakeError; +use reth_dns_discovery::resolver::ResolveError; use reth_eth_wire::{ errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError}, DisconnectReason, @@ -12,10 +13,15 @@ use std::{fmt, io, io::ErrorKind}; pub enum NetworkError { /// General IO error. #[error(transparent)] - Io(#[from] std::io::Error), + Io(#[from] io::Error), /// IO error when creating the discovery service #[error("Failed to launch discovery service: {0}")] - Discovery(std::io::Error), + Discovery(io::Error), + /// Error when setting up the DNS resolver failed + /// + /// See also [DnsResolver](reth_dns_discovery::DnsResolver::from_system_conf) + #[error("Failed to configure DNS resolver: {0}")] + DnsResolver(#[from] ResolveError), } /// Abstraction over errors that can lead to a failed session diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index fe45321ab..2a13e1e4f 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -161,6 +161,7 @@ where hello_message, status, fork_filter, + dns_discovery_config, .. } = config; @@ -177,7 +178,9 @@ where disc_config }); - let discovery = Discovery::new(discovery_addr, secret_key, discovery_v4_config).await?; + let discovery = + Discovery::new(discovery_addr, secret_key, discovery_v4_config, dns_discovery_config) + .await?; // need to retrieve the addr here since provided port could be `0` let local_peer_id = discovery.local_id();