diff --git a/Cargo.lock b/Cargo.lock index 620a8dc16..c0f8412a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1103,7 +1103,7 @@ dependencies = [ "hex", "hkdf", "lazy_static", - "lru", + "lru 0.7.8", "more-asserts", "parking_lot 0.11.2", "rand 0.8.5", @@ -2556,6 +2556,15 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "lru" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e7d46de488603ffdd5f30afbc64fbba2378214a2c3a2fb83abf3d33126df17" +dependencies = [ + "hashbrown 0.13.1", +] + [[package]] name = "lru-cache" version = "0.1.2" @@ -3734,7 +3743,12 @@ dependencies = [ "bytes", "data-encoding", "enr 0.7.0", + "linked_hash_set", + "lru 0.9.0", + "parking_lot 0.12.1", + "reth-net-common", "reth-primitives", + "reth-tracing", "secp256k1 0.24.2", "thiserror", "tokio", diff --git a/crates/net/dns/Cargo.toml b/crates/net/dns/Cargo.toml index 8138a8ac2..1ee4f3116 100644 --- a/crates/net/dns/Cargo.toml +++ b/crates/net/dns/Cargo.toml @@ -10,6 +10,7 @@ description = "Support for EIP-1459 Node Discovery via DNS" [dependencies] # reth reth-primitives = { path = "../../primitives" } +reth-net-common = { path = "../common" } # ethereum secp256k1 = { version = "0.24", features = [ @@ -28,7 +29,15 @@ trust-dns-resolver = "0.22" # misc data-encoding = "2" +async-trait = "0.1" bytes = "1.2" -tracing = "0.1" +linked_hash_set = "0.1" +lru = "0.9" thiserror = "1.0" -async-trait = "0.1.61" +tracing = "0.1" +parking_lot = "0.12" + + +[dev-dependencies] +tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread"] } +reth-tracing = { path = "../../tracing" } diff --git a/crates/net/dns/src/config.rs b/crates/net/dns/src/config.rs index 2f7e3d9f1..be324ced5 100644 --- a/crates/net/dns/src/config.rs +++ b/crates/net/dns/src/config.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{num::NonZeroUsize, time::Duration}; /// Settings for the [DnsDiscoveryClient](crate::DnsDiscoveryClient). #[derive(Debug, Clone)] @@ -7,17 +7,25 @@ pub struct DnsDiscoveryConfig { /// /// Default: 5s pub lookup_timeout: Duration, - /// The rate at which lookups should be re-triggered. + /// The DNS request rate limit + /// + /// Default: 3 + pub max_requests_per_sec: NonZeroUsize, + /// The rate at which trees should be updated. /// /// Default: 30min - pub lookup_interval: Duration, + pub recheck_interval: Duration, + /// Maximum number of cached DNS records. + pub dns_record_cache_limit: NonZeroUsize, } impl Default for DnsDiscoveryConfig { fn default() -> Self { Self { lookup_timeout: Duration::from_secs(5), - lookup_interval: Duration::from_secs(60 * 30), + max_requests_per_sec: NonZeroUsize::new(3).unwrap(), + recheck_interval: Duration::from_secs(60 * 30), + dns_record_cache_limit: NonZeroUsize::new(1_000).unwrap(), } } } diff --git a/crates/net/dns/src/error.rs b/crates/net/dns/src/error.rs new file mode 100644 index 000000000..3b6d6bf85 --- /dev/null +++ b/crates/net/dns/src/error.rs @@ -0,0 +1,38 @@ +use crate::tree::TreeRootEntry; + +/// Alias for a parse result +pub(crate) type ParseEntryResult = Result; + +pub(crate) type LookupResult = Result; + +/// Error while parsing a [DnsEntry] +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub enum ParseDnsEntryError { + #[error("Unknown entry: {0}")] + UnknownEntry(String), + #[error("Field {0} not found.")] + FieldNotFound(&'static str), + #[error("Base64 decoding failed: {0}")] + Base64DecodeError(String), + #[error("Base32 decoding failed: {0}")] + Base32DecodeError(String), + #[error("{0}")] + RlpDecodeError(String), + #[error("{0}")] + Other(String), +} + +/// Errors that can happen during lookups +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub(crate) enum LookupError { + #[error(transparent)] + Parse(#[from] ParseDnsEntryError), + #[error("Failed to verify root {0}")] + InvalidRoot(TreeRootEntry), + #[error("Request timed out")] + RequestTimedOut, + #[error("Entry not found")] + EntryNotFound, +} diff --git a/crates/net/dns/src/lib.rs b/crates/net/dns/src/lib.rs index 19615b502..0d749582b 100644 --- a/crates/net/dns/src/lib.rs +++ b/crates/net/dns/src/lib.rs @@ -4,30 +4,48 @@ no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] -// TODO rm later -#![allow(missing_docs, unreachable_pub, unused)] //! Implementation of [EIP-1459](https://eips.ethereum.org/EIPS/eip-1459) Node Discovery via DNS. -use std::{ - collections::HashMap, - sync::Arc, - task::{Context, Poll}, +pub use crate::resolver::{DnsResolver, MapResolver, Resolver}; +use crate::{ + query::{QueryOutcome, QueryPool, ResolveEntryResult, ResolveRootResult}, + sync::{ResolveKind, SyncAction}, + tree::{DnsEntry, LinkEntry}, }; -use tokio::sync::{mpsc, mpsc::UnboundedSender}; -use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; +pub use config::DnsDiscoveryConfig; +use enr::Enr; +use error::ParseDnsEntryError; +use lru::LruCache; +use reth_primitives::{NodeRecord, PeerId}; +use secp256k1::SecretKey; +use std::{ + collections::{hash_map::Entry, HashMap, VecDeque}, + net::IpAddr, + pin::Pin, + sync::Arc, + task::{ready, Context, Poll}, + time::{Duration, Instant}, +}; +use sync::SyncTree; +use tokio::sync::{ + mpsc, + mpsc::{error::TrySendError, UnboundedSender}, + oneshot, +}; +use tokio_stream::{ + wrappers::{ReceiverStream, UnboundedReceiverStream}, + Stream, +}; +use tracing::{debug, warn}; mod config; +mod error; +mod query; pub mod resolver; mod sync; pub mod tree; -use crate::{ - sync::SyncTree, - tree::{LinkEntry, ParseDnsEntryError}, -}; -pub use config::DnsDiscoveryConfig; - /// [DnsDiscoveryService] front-end. #[derive(Clone)] pub struct DnsDiscoveryHandle { @@ -37,33 +55,88 @@ pub struct DnsDiscoveryHandle { // === impl DnsDiscovery === -impl DnsDiscoveryHandle {} +impl DnsDiscoveryHandle { + /// Starts syncing the given link to a tree. + pub fn sync_tree(&mut self, link: &str) -> Result<(), ParseDnsEntryError> { + self.sync_tree_with_link(link.parse()?); + Ok(()) + } + + /// Starts syncing the given link to a tree. + pub fn sync_tree_with_link(&mut self, link: LinkEntry) { + let _ = self.to_service.send(DnsDiscoveryCommand::SyncTree(link)); + } + + /// Returns the receiver half of new listener channel that streams discovered [`NodeRecord`]s. + pub async fn node_record_stream( + &self, + ) -> Result, oneshot::error::RecvError> { + let (tx, rx) = oneshot::channel(); + let cmd = DnsDiscoveryCommand::NodeRecordUpdates(tx); + let _ = self.to_service.send(cmd); + rx.await + } +} /// A client that discovers nodes via DNS. #[must_use = "Service does nothing unless polled"] -pub struct DnsDiscoveryService { +pub struct DnsDiscoveryService { /// Copy of the sender half, so new [`DnsDiscoveryHandle`] can be created on demand. command_tx: UnboundedSender, /// Receiver half of the command channel. command_rx: UnboundedReceiverStream, - /// All subscribers for event updates. - event_listener: Vec>, + /// All subscribers for resolved [NodeRecord]s. + node_record_listeners: Vec>, /// All the trees that can be synced. - trees: HashMap, SyncTree>, + trees: HashMap, + /// All queries currently in progress + queries: QueryPool, + /// Cached dns records + dns_record_cache: LruCache>, + /// all buffered events + queued_events: VecDeque, + /// The rate at which trees should be updated. + recheck_interval: Duration, } // === impl DnsDiscoveryService === -impl DnsDiscoveryService { +impl DnsDiscoveryService { /// Creates a new instance of the [DnsDiscoveryService] using the given settings. - pub fn new(_config: DnsDiscoveryConfig) -> Self { - todo!() + /// + /// ``` + /// use std::sync::Arc; + /// use reth_dns_discovery::{DnsDiscoveryService, DnsResolver}; + /// # fn t() { + /// let service = + /// DnsDiscoveryService::new(Arc::new(DnsResolver::from_system_conf().unwrap()), Default::default()); + /// # } + /// ``` + pub fn new(resolver: Arc, config: DnsDiscoveryConfig) -> Self { + let DnsDiscoveryConfig { + lookup_timeout, + max_requests_per_sec, + recheck_interval, + dns_record_cache_limit, + } = config; + let queries = QueryPool::new(resolver, max_requests_per_sec, lookup_timeout); + let (command_tx, command_rx) = mpsc::unbounded_channel(); + Self { + command_tx, + command_rx: UnboundedReceiverStream::new(command_rx), + node_record_listeners: Default::default(), + trees: Default::default(), + queries, + dns_record_cache: LruCache::new(dns_record_cache_limit), + queued_events: Default::default(), + recheck_interval, + } } /// Same as [DnsDiscoveryService::new] but also returns a new handle that's connected to the /// service - pub fn new_pair(config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) { - let service = Self::new(config); + pub fn new_pair(resolver: Arc, config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) { + let service = Self::new(resolver, config); let handle = service.handle(); (service, handle) } @@ -73,38 +146,368 @@ impl DnsDiscoveryService { DnsDiscoveryHandle { to_service: self.command_tx.clone() } } - /// Creates a new channel for [`DiscoveryUpdate`]s. - pub fn event_listener(&mut self) -> ReceiverStream { + /// Creates a new channel for [`NodeRecord`]s. + pub fn node_record_stream(&mut self) -> ReceiverStream { let (tx, rx) = mpsc::channel(256); - self.event_listener.push(tx); + self.node_record_listeners.push(tx); ReceiverStream::new(rx) } /// Sends the event to all listeners. /// /// Remove channels that got closed. - fn notify(&mut self, event: DnsDiscoveryEvent) { - self.event_listener.retain(|listener| listener.try_send(event.clone()).is_ok()); + fn notify(&mut self, record: NodeRecord) { + self.node_record_listeners.retain_mut(|listener| match listener.try_send(record) { + Ok(()) => true, + Err(err) => match err { + TrySendError::Full(_) => true, + TrySendError::Closed(_) => false, + }, + }); } /// Starts syncing the given link to a tree. pub fn sync_tree(&mut self, link: &str) -> Result<(), ParseDnsEntryError> { - let _link: LinkEntry = link.parse()?; - + self.sync_tree_with_link(link.parse()?); Ok(()) } + /// Starts syncing the given link to a tree. + pub fn sync_tree_with_link(&mut self, link: LinkEntry) { + self.queries.resolve_root(link); + } + /// Resolves an entry - fn resolve_entry(&mut self, _domain: impl Into, _hash: impl Into) {} + fn resolve_entry(&mut self, link: LinkEntry, hash: String, kind: ResolveKind) { + if let Some(entry) = self.dns_record_cache.get(&hash).cloned() { + // already resolved + let cached = ResolveEntryResult { entry: Some(Ok(entry)), link, hash, kind }; + self.on_resolved_entry(cached); + return + } + self.queries.resolve_entry(link, hash, kind) + } + + fn on_resolved_root(&mut self, resp: ResolveRootResult) { + match resp { + Ok((root, link)) => match self.trees.entry(link.clone()) { + Entry::Occupied(mut entry) => { + entry.get_mut().update_root(root); + } + Entry::Vacant(entry) => { + entry.insert(SyncTree::new(root, link)); + } + }, + Err((err, link)) => { + debug!(target: "disc::dns",?err, ?link, "Failed to lookup root") + } + } + } + + fn on_resolved_enr(&mut self, enr: Enr) { + if let Some(record) = convert_enr_node_record(&enr) { + self.notify(record); + } + self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr)) + } + + fn on_resolved_entry(&mut self, resp: ResolveEntryResult) { + let ResolveEntryResult { entry, link, hash, kind } = resp; + + match entry { + Some(Err(err)) => { + debug!(target: "disc::dns",?err, domain=%link.domain, ?hash, "Failed to lookup entry") + } + None => { + debug!(target: "disc::dns",domain=%link.domain, ?hash, "No dns entry") + } + Some(Ok(entry)) => { + // cache entry + self.dns_record_cache.push(hash.clone(), entry.clone()); + + match entry { + DnsEntry::Root(root) => { + debug!(target: "disc::dns",%root, domain=%link.domain, ?hash, "resolved unexpected root entry"); + } + DnsEntry::Link(link_entry) => { + if kind.is_link() { + if let Some(tree) = self.trees.get_mut(&link) { + tree.resolved_links_mut().insert(hash, link_entry.clone()); + } + self.sync_tree_with_link(link_entry) + } else { + debug!(target: "disc::dns",%link_entry, domain=%link.domain, ?hash, "resolved unexpected Link entry"); + } + } + DnsEntry::Branch(branch_entry) => { + if let Some(tree) = self.trees.get_mut(&link) { + tree.extend_children(kind, branch_entry.children) + } + } + DnsEntry::Node(entry) => { + if kind.is_link() { + debug!(target: "disc::dns",domain=%link.domain, ?hash, "resolved unexpected enr entry"); + } else { + self.on_resolved_enr(entry.enr) + } + } + } + } + } + } /// Advances the state of the DNS discovery service by polling,triggering lookups - pub(crate) fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<()> { - Poll::Pending + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + loop { + // drain buffered events first + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event) + } + + // process all incoming commands + while let Poll::Ready(Some(cmd)) = Pin::new(&mut self.command_rx).poll_next(cx) { + match cmd { + DnsDiscoveryCommand::SyncTree(link) => { + self.sync_tree_with_link(link); + } + DnsDiscoveryCommand::NodeRecordUpdates(tx) => { + let _ = tx.send(self.node_record_stream()); + } + } + } + + while let Poll::Ready(outcome) = self.queries.poll(cx) { + // handle query outcome + match outcome { + QueryOutcome::Root(resp) => self.on_resolved_root(resp), + QueryOutcome::Entry(resp) => self.on_resolved_entry(resp), + } + } + + let mut progress = false; + let now = Instant::now(); + let mut pending_resolves = Vec::new(); + let mut pending_updates = Vec::new(); + for tree in self.trees.values_mut() { + while let Some(action) = tree.poll(now, self.recheck_interval) { + progress = true; + match action { + SyncAction::UpdateRoot => { + pending_updates.push(tree.link().clone()); + } + SyncAction::Enr(hash) => { + pending_resolves.push((tree.link().clone(), hash, ResolveKind::Enr)); + } + SyncAction::Link(hash) => { + pending_resolves.push((tree.link().clone(), hash, ResolveKind::Link)); + } + } + } + } + + for (domain, hash, kind) in pending_resolves { + self.resolve_entry(domain, hash, kind) + } + + for link in pending_updates { + self.sync_tree_with_link(link) + } + + if !progress && self.queued_events.is_empty() { + return Poll::Pending + } + } } } -enum DnsDiscoveryCommand {} +/// A Stream events, mainly used for debugging +impl Stream for DnsDiscoveryService { + type Item = DnsDiscoveryEvent; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Some(ready!(self.get_mut().poll(cx)))) + } +} + +/// Commands sent from [DnsDiscoveryHandle] to [DnsDiscoveryService] +enum DnsDiscoveryCommand { + /// Sync a tree + SyncTree(LinkEntry), + NodeRecordUpdates(oneshot::Sender>), +} /// Represents dns discovery related update events. #[derive(Debug, Clone)] -pub enum DnsDiscoveryEvent {} +pub enum DnsDiscoveryEvent { + /// Resolved an Enr entry via DNS. + Enr(Enr), +} + +/// Converts an [Enr] into a [NodeRecord] +fn convert_enr_node_record(enr: &Enr) -> Option { + let record = NodeRecord { + address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?, + tcp_port: enr.tcp4().or_else(|| enr.tcp6())?, + udp_port: enr.udp4().or_else(|| enr.udp6())?, + id: PeerId::from_slice(&enr.public_key().serialize_uncompressed()[1..]), + } + .into_ipv4_mapped(); + Some(record) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tree::TreeRootEntry; + use enr::{EnrBuilder, EnrKey}; + use reth_primitives::Chain; + use secp256k1::rand::thread_rng; + use std::{future::poll_fn, net::Ipv4Addr}; + use tokio_stream::StreamExt; + + #[tokio::test] + async fn test_start_root_sync() { + reth_tracing::init_test_tracing(); + + let secret_key = SecretKey::new(&mut thread_rng()); + let resolver = MapResolver::default(); + let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE"; + let mut root: TreeRootEntry = s.parse().unwrap(); + root.sign(&secret_key).unwrap(); + + let link = + LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() }; + resolver.insert(link.domain.clone(), root.to_string()); + + let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default()); + + service.sync_tree_with_link(link.clone()); + + poll_fn(|cx| { + let _ = service.poll(cx); + Poll::Ready(()) + }) + .await; + + let tree = service.trees.get(&link).unwrap(); + assert_eq!(tree.root().clone(), root); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_get_node() { + reth_tracing::init_test_tracing(); + + let secret_key = SecretKey::new(&mut thread_rng()); + let resolver = MapResolver::default(); + let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE"; + let mut root: TreeRootEntry = s.parse().unwrap(); + root.sign(&secret_key).unwrap(); + + let link = + LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() }; + resolver.insert(link.domain.clone(), root.to_string()); + + let mut builder = EnrBuilder::new("v4"); + builder.ip4(Ipv4Addr::LOCALHOST).udp4(30303).tcp4(30303); + let enr = builder.build(&secret_key).unwrap(); + + resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64()); + + let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default()); + + let mut node_records = service.node_record_stream(); + + let task = tokio::task::spawn(async move { + let _ = node_records.next().await.unwrap(); + }); + + service.sync_tree_with_link(link.clone()); + + let event = poll_fn(|cx| service.poll(cx)).await; + + match event { + DnsDiscoveryEvent::Enr(discovered) => { + assert_eq!(discovered, enr); + } + } + + poll_fn(|cx| { + assert!(service.poll(cx).is_pending()); + Poll::Ready(()) + }) + .await; + + task.await.unwrap(); + } + + #[tokio::test] + async fn test_recheck_tree() { + reth_tracing::init_test_tracing(); + + let config = DnsDiscoveryConfig { + recheck_interval: Duration::from_millis(750), + ..Default::default() + }; + + let secret_key = SecretKey::new(&mut thread_rng()); + let resolver = Arc::new(MapResolver::default()); + let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE"; + let mut root: TreeRootEntry = s.parse().unwrap(); + root.sign(&secret_key).unwrap(); + + let link = + LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() }; + resolver.insert(link.domain.clone(), root.to_string()); + + let mut service = DnsDiscoveryService::new(Arc::clone(&resolver), config.clone()); + + service.sync_tree_with_link(link.clone()); + + poll_fn(|cx| { + assert!(service.poll(cx).is_pending()); + Poll::Ready(()) + }) + .await; + + // await recheck timeout + tokio::time::sleep(config.recheck_interval).await; + + let enr = EnrBuilder::new("v4").build(&secret_key).unwrap(); + resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64()); + + let event = poll_fn(|cx| service.poll(cx)).await; + + match event { + DnsDiscoveryEvent::Enr(discovered) => { + assert_eq!(discovered, enr); + } + } + + poll_fn(|cx| { + assert!(service.poll(cx).is_pending()); + Poll::Ready(()) + }) + .await; + } + + #[tokio::test] + #[ignore] + async fn test_dns_resolver() { + reth_tracing::init_test_tracing(); + + let mut service = DnsDiscoveryService::new( + Arc::new(DnsResolver::from_system_conf().unwrap()), + Default::default(), + ); + + service.sync_tree(&Chain::mainnet().public_dns_network_protocol().unwrap()).unwrap(); + + while let Some(event) = service.next().await { + match event { + DnsDiscoveryEvent::Enr(enr) => { + println!("discovered enr {}", enr.to_base64()); + } + } + } + } +} diff --git a/crates/net/dns/src/query.rs b/crates/net/dns/src/query.rs new file mode 100644 index 000000000..34b5deccc --- /dev/null +++ b/crates/net/dns/src/query.rs @@ -0,0 +1,281 @@ +//! Handles query execution + +use crate::{ + error::{LookupError, LookupResult}, + resolver::Resolver, + sync::ResolveKind, + tree::{DnsEntry, LinkEntry, TreeRootEntry}, +}; +use enr::EnrKeyUnambiguous; +use reth_net_common::ratelimit::{Rate, RateLimit}; +use std::{ + collections::VecDeque, + future::Future, + num::NonZeroUsize, + pin::Pin, + sync::Arc, + task::{ready, Context, Poll}, + time::Duration, +}; + +/// The `QueryPool` provides an aggregate state machine for driving queries to completion. +pub(crate) struct QueryPool { + /// The [Resolver] that's used to lookup queries. + resolver: Arc, + /// Buffered queries + queued_queries: VecDeque>, + /// All active queries + active_queries: Vec>, + /// buffered results + queued_outcomes: VecDeque>, + /// Rate limit for DNS requests + rate_limit: RateLimit, + /// Timeout for DNS lookups. + lookup_timeout: Duration, +} + +// === impl QueryPool === + +impl QueryPool { + pub(crate) fn new( + resolver: Arc, + max_requests_per_sec: NonZeroUsize, + lookup_timeout: Duration, + ) -> Self { + Self { + resolver, + queued_queries: Default::default(), + active_queries: vec![], + queued_outcomes: Default::default(), + rate_limit: RateLimit::new(Rate::new( + max_requests_per_sec.get() as u64, + Duration::from_secs(1), + )), + lookup_timeout, + } + } + + /// Resolves the root the link's domain references + pub(crate) fn resolve_root(&mut self, link: LinkEntry) { + let resolver = Arc::clone(&self.resolver); + let timeout = self.lookup_timeout; + self.queued_queries.push_back(Query::Root(Box::pin(async move { + resolve_root(resolver, link, timeout).await + }))) + } + + /// Resolves the [DnsEntry] for `` + pub(crate) fn resolve_entry(&mut self, link: LinkEntry, hash: String, kind: ResolveKind) { + let resolver = Arc::clone(&self.resolver); + let timeout = self.lookup_timeout; + self.queued_queries.push_back(Query::Entry(Box::pin(async move { + resolve_entry(resolver, link, hash, kind, timeout).await + }))) + } + + /// Advances the state of the queries + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + // drain buffered events first + if let Some(event) = self.queued_outcomes.pop_front() { + return Poll::Ready(event) + } + + // queue in new queries + 'queries: loop { + if self.rate_limit.poll_ready(cx).is_ready() { + if let Some(query) = self.queued_queries.pop_front() { + self.rate_limit.tick(); + self.active_queries.push(query); + continue 'queries + } + } + break + } + + // advance all queries + for idx in (0..self.active_queries.len()).rev() { + let mut query = self.active_queries.swap_remove(idx); + if let Poll::Ready(outcome) = query.poll(cx) { + self.queued_outcomes.push_back(outcome); + } else { + // still pending + self.active_queries.push(query); + } + } + + if self.queued_outcomes.is_empty() { + return Poll::Pending + } + } + } +} + +// === Various future/type alias === + +pub(crate) struct ResolveEntryResult { + pub(crate) entry: Option>>, + pub(crate) link: LinkEntry, + pub(crate) hash: String, + pub(crate) kind: ResolveKind, +} + +pub(crate) type ResolveRootResult = + Result<(TreeRootEntry, LinkEntry), (LookupError, LinkEntry)>; + +type ResolveRootFuture = Pin> + Send>>; + +type ResolveEntryFuture = Pin> + Send>>; + +enum Query { + Root(ResolveRootFuture), + Entry(ResolveEntryFuture), +} + +// === impl Query === + +impl Query { + /// Advances the query + fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { + match self { + Query::Root(ref mut query) => { + let outcome = ready!(query.as_mut().poll(cx)); + Poll::Ready(QueryOutcome::Root(outcome)) + } + Query::Entry(ref mut query) => { + let outcome = ready!(query.as_mut().poll(cx)); + Poll::Ready(QueryOutcome::Entry(outcome)) + } + } + } +} + +/// The output the queries return +pub(crate) enum QueryOutcome { + Root(ResolveRootResult), + Entry(ResolveEntryResult), +} + +/// Retrieves the [DnsEntry] +async fn resolve_entry( + resolver: Arc, + link: LinkEntry, + hash: String, + kind: ResolveKind, + timeout: Duration, +) -> ResolveEntryResult { + let fqn = format!("{hash}.{}", link.domain); + let mut resp = ResolveEntryResult { entry: None, link, hash, kind }; + match lookup_with_timeout::(&resolver, &fqn, timeout).await { + Ok(Some(entry)) => { + resp.entry = Some(entry.parse::>().map_err(|err| err.into())) + } + Err(err) => resp.entry = Some(Err(err)), + Ok(None) => {} + } + resp +} + +/// Retrieves the root entry the link points to and returns the verified entry +/// +/// Returns an error if the record could be retrieved but is not a root entry or failed to be +/// verified. +async fn resolve_root( + resolver: Arc, + link: LinkEntry, + timeout: Duration, +) -> ResolveRootResult { + let root = match lookup_with_timeout::(&resolver, &link.domain, timeout).await { + Ok(Some(root)) => root, + Ok(_) => return Err((LookupError::EntryNotFound, link)), + Err(err) => return Err((err, link)), + }; + + match root.parse::() { + Ok(root) => { + if root.verify::(&link.pubkey) { + Ok((root, link)) + } else { + Err((LookupError::InvalidRoot(root), link)) + } + } + Err(err) => Err((err.into(), link)), + } +} + +async fn lookup_with_timeout( + r: &R, + query: &str, + timeout: Duration, +) -> LookupResult> { + match tokio::time::timeout(timeout, r.lookup_txt(query)).await { + Ok(res) => Ok(res), + Err(_) => Err(LookupError::RequestTimedOut), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{resolver::TimeoutResolver, DnsDiscoveryConfig, MapResolver}; + use std::future::poll_fn; + + #[tokio::test] + async fn test_rate_limit() { + let resolver = Arc::new(MapResolver::default()); + let config = DnsDiscoveryConfig::default(); + let mut pool = QueryPool::new(resolver, config.max_requests_per_sec, config.lookup_timeout); + + let s = "enrtree://AM5FCQLWIZX2QFPNJAP7VUERCCRNGRHWZG3YYHIUV7BVDQ5FDPRT2@nodes.example.org"; + let entry: LinkEntry = s.parse().unwrap(); + + for _n in 0..config.max_requests_per_sec.get() { + poll_fn(|cx| { + pool.resolve_root(entry.clone()); + assert_eq!(pool.queued_queries.len(), 1); + assert!(pool.rate_limit.poll_ready(cx).is_ready()); + let _ = pool.poll(cx); + assert_eq!(pool.queued_queries.len(), 0); + Poll::Ready(()) + }) + .await; + } + + pool.resolve_root(entry.clone()); + assert_eq!(pool.queued_queries.len(), 1); + poll_fn(|cx| { + assert!(pool.rate_limit.poll_ready(cx).is_pending()); + let _ = pool.poll(cx); + assert_eq!(pool.queued_queries.len(), 1); + Poll::Ready(()) + }) + .await; + } + + #[tokio::test] + async fn test_timeouts() { + let config = + DnsDiscoveryConfig { lookup_timeout: Duration::from_millis(500), ..Default::default() }; + let resolver = Arc::new(TimeoutResolver(config.lookup_timeout * 2)); + let mut pool = QueryPool::new(resolver, config.max_requests_per_sec, config.lookup_timeout); + + let s = "enrtree://AM5FCQLWIZX2QFPNJAP7VUERCCRNGRHWZG3YYHIUV7BVDQ5FDPRT2@nodes.example.org"; + let entry: LinkEntry = s.parse().unwrap(); + pool.resolve_root(entry); + + let outcome = poll_fn(|cx| pool.poll(cx)).await; + + match outcome { + QueryOutcome::Root(res) => { + let res = res.unwrap_err().0; + match res { + LookupError::RequestTimedOut => {} + _ => unreachable!(), + } + } + QueryOutcome::Entry(_) => { + unreachable!() + } + } + } +} diff --git a/crates/net/dns/src/resolver.rs b/crates/net/dns/src/resolver.rs index 4a5634e12..edc6a3e7f 100644 --- a/crates/net/dns/src/resolver.rs +++ b/crates/net/dns/src/resolver.rs @@ -1,39 +1,121 @@ //! Perform DNS lookups use async_trait::async_trait; -use std::{ - collections::HashMap, - ops::{Deref, DerefMut}, +use parking_lot::RwLock; +use std::collections::HashMap; +use tracing::trace; +pub use trust_dns_resolver::TokioAsyncResolver; +use trust_dns_resolver::{ + error::ResolveError, proto::DnsHandle, AsyncResolver, ConnectionProvider, }; /// A type that can lookup DNS entries #[async_trait] -pub trait Resolver: Send + Sync { - /// Performs a textual lookup. +pub trait Resolver: Send + Sync + Unpin + 'static { + /// Performs a textual lookup and returns the first text async fn lookup_txt(&self, query: &str) -> Option; } -/// A [Resolver] that uses an in memory map to lookup entries -#[derive(Debug, Clone)] -pub struct MapResolver(HashMap); - -impl Deref for MapResolver { - type Target = HashMap; - - fn deref(&self) -> &Self::Target { - &self.0 +#[async_trait] +impl Resolver for AsyncResolver +where + C: DnsHandle, + P: ConnectionProvider, +{ + async fn lookup_txt(&self, query: &str) -> Option { + // See: [AsyncResolver::txt_lookup] + // > *hint* queries that end with a '.' are fully qualified names and are cheaper lookups + let fqn = if query.ends_with('.') { query.to_string() } else { format!("{query}.") }; + match self.txt_lookup(fqn).await { + Err(err) => { + trace!(target: "disc::dns", ?err, ?query, "dns lookup failed"); + None + } + Ok(lookup) => { + let txt = lookup.into_iter().next()?; + let entry = txt.iter().next()?; + String::from_utf8(entry.to_vec()).ok() + } + } } } -impl DerefMut for MapResolver { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 +/// An asynchronous DNS resolver +/// +/// See also [TokioAsyncResolver](trust_dns_resolver::TokioAsyncResolver) +/// +/// ``` +/// # fn t() { +/// use reth_dns_discovery::resolver::DnsResolver; +/// let resolver = DnsResolver::from_system_conf().unwrap(); +/// # } +/// ``` +#[derive(Clone)] +pub struct DnsResolver(TokioAsyncResolver); + +// === impl DnsResolver === + +impl DnsResolver { + /// Create a new resolver by wrapping the given [AsyncResolver] + pub fn new(resolver: TokioAsyncResolver) -> Self { + Self(resolver) + } + + /// Constructs a new Tokio based Resolver with the system configuration. + /// + /// This will use `/etc/resolv.conf` on Unix OSes and the registry on Windows. + pub fn from_system_conf() -> Result { + TokioAsyncResolver::tokio_from_system_conf().map(Self::new) + } +} + +#[async_trait] +impl Resolver for DnsResolver { + async fn lookup_txt(&self, query: &str) -> Option { + Resolver::lookup_txt(&self.0, query).await + } +} + +/// A [Resolver] that uses an in memory map to lookup entries +#[derive(Debug, Default)] +pub struct MapResolver(RwLock>); + +// === impl MapResolver === + +impl MapResolver { + /// Inserts a key-value pair into the map. + pub fn insert(&self, k: String, v: String) -> Option { + self.0.write().insert(k, v) + } + + /// Returns the value corresponding to the key + pub fn get(&self, k: &str) -> Option { + self.0.read().get(k).cloned() + } + + /// Removes a key from the map, returning the value at the key if the key was previously in the + /// map. + pub fn remove(&self, k: &str) -> Option { + self.0.write().remove(k) } } #[async_trait] impl Resolver for MapResolver { async fn lookup_txt(&self, query: &str) -> Option { - self.get(query).cloned() + self.get(query) + } +} + +/// A Resolver that always times out. +#[cfg(test)] +pub(crate) struct TimeoutResolver(pub(crate) std::time::Duration); + +#[cfg(test)] +#[async_trait] +impl Resolver for TimeoutResolver { + async fn lookup_txt(&self, _query: &str) -> Option { + tokio::time::sleep(self.0).await; + None } } diff --git a/crates/net/dns/src/sync.rs b/crates/net/dns/src/sync.rs index 1165023f9..0174670b3 100644 --- a/crates/net/dns/src/sync.rs +++ b/crates/net/dns/src/sync.rs @@ -1,11 +1,162 @@ -//! Sync trees - -use crate::tree::LinkEntry; +use crate::tree::{LinkEntry, TreeRootEntry}; use enr::EnrKeyUnambiguous; +use linked_hash_set::LinkedHashSet; use secp256k1::SecretKey; +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; /// A sync-able tree pub(crate) struct SyncTree { - /// The link to this tree. + /// Root of the tree + root: TreeRootEntry, + /// Link to this tree link: LinkEntry, + /// Timestamp when the root was updated + root_updated: Instant, + /// The state of the tree sync progress. + sync_state: SyncState, + /// Links contained in this tree + resolved_links: HashMap>, + /// Unresolved links of the tree + unresolved_links: LinkedHashSet, + /// Unresolved nodes of the tree + unresolved_nodes: LinkedHashSet, +} + +// === impl SyncTree === + +impl SyncTree { + pub(crate) fn new(root: TreeRootEntry, link: LinkEntry) -> Self { + Self { + root, + link, + root_updated: Instant::now(), + sync_state: SyncState::Pending, + resolved_links: Default::default(), + unresolved_links: Default::default(), + unresolved_nodes: Default::default(), + } + } + + #[cfg(test)] + pub(crate) fn root(&self) -> &TreeRootEntry { + &self.root + } + + pub(crate) fn link(&self) -> &LinkEntry { + &self.link + } + + pub(crate) fn resolved_links_mut(&mut self) -> &mut HashMap> { + &mut self.resolved_links + } + + pub(crate) fn extend_children( + &mut self, + kind: ResolveKind, + children: impl IntoIterator, + ) { + match kind { + ResolveKind::Enr => { + self.unresolved_nodes.extend(children); + } + ResolveKind::Link => { + self.unresolved_links.extend(children); + } + } + } + + /// Advances the state of the tree by returning actions to perform + pub(crate) fn poll(&mut self, now: Instant, update_timeout: Duration) -> Option { + match self.sync_state { + SyncState::Pending => { + self.sync_state = SyncState::Enr; + return Some(SyncAction::Link(self.root.link_root.clone())) + } + SyncState::Enr => { + self.sync_state = SyncState::Active; + return Some(SyncAction::Enr(self.root.enr_root.clone())) + } + SyncState::Link => { + self.sync_state = SyncState::Active; + return Some(SyncAction::Link(self.root.link_root.clone())) + } + SyncState::Active => { + if now > self.root_updated + update_timeout { + self.sync_state = SyncState::RootUpdate; + return Some(SyncAction::UpdateRoot) + } + } + SyncState::RootUpdate => return None, + } + + if let Some(link) = self.unresolved_links.pop_front() { + return Some(SyncAction::Link(link)) + } + + let enr = self.unresolved_nodes.pop_front()?; + Some(SyncAction::Enr(enr)) + } + + /// Updates the root and returns what changed + pub(crate) fn update_root(&mut self, root: TreeRootEntry) { + let enr = root.enr_root == self.root.enr_root; + let link = root.link_root == self.root.link_root; + + self.root = root; + self.root_updated = Instant::now(); + + let state = match (enr, link) { + (true, true) => { + self.unresolved_nodes.clear(); + self.unresolved_links.clear(); + SyncState::Pending + } + (true, _) => { + self.unresolved_nodes.clear(); + SyncState::Enr + } + (_, true) => { + self.unresolved_links.clear(); + SyncState::Link + } + _ => { + // unchanged + return + } + }; + self.sync_state = state; + } +} + +/// The action to perform by the service +pub(crate) enum SyncAction { + UpdateRoot, + Enr(String), + Link(String), +} + +/// How the [SyncTree::update_root] changed the root +enum SyncState { + RootUpdate, + Pending, + Enr, + Link, + Active, +} + +/// What kind of hash to resolve +pub(crate) enum ResolveKind { + Enr, + Link, +} + +// === impl ResolveKind === + +impl ResolveKind { + pub(crate) fn is_link(&self) -> bool { + matches!(self, ResolveKind::Link) + } } diff --git a/crates/net/dns/src/tree.rs b/crates/net/dns/src/tree.rs index b384416a2..36e59c878 100644 --- a/crates/net/dns/src/tree.rs +++ b/crates/net/dns/src/tree.rs @@ -16,10 +16,16 @@ //! `signature` is a 65-byte secp256k1 EC signature over the keccak256 hash of the record //! content, excluding the sig= part, encoded as URL-safe base64 (RFC-4648). -use crate::tree::ParseDnsEntryError::{FieldNotFound, UnknownEntry}; +#![allow(missing_docs)] + +use crate::error::{ + ParseDnsEntryError, + ParseDnsEntryError::{FieldNotFound, UnknownEntry}, + ParseEntryResult, +}; use bytes::Bytes; use data_encoding::{BASE32_NOPAD, BASE64URL_NOPAD}; -use enr::{Enr, EnrKey, EnrKeyUnambiguous, EnrPublicKey}; +use enr::{Enr, EnrError, EnrKey, EnrKeyUnambiguous, EnrPublicKey}; use reth_primitives::hex; use secp256k1::SecretKey; use std::{fmt, str::FromStr}; @@ -50,24 +56,6 @@ impl fmt::Display for DnsEntry { } } -/// Error while parsing a [DnsEntry] -#[derive(thiserror::Error, Debug)] -#[allow(missing_docs)] -pub enum ParseDnsEntryError { - #[error("Unknown entry: {0}")] - UnknownEntry(String), - #[error("Field {0} not found.")] - FieldNotFound(&'static str), - #[error("Base64 decoding failed: {0}")] - Base64DecodeError(String), - #[error("Base32 decoding failed: {0}")] - Base32DecodeError(String), - #[error("{0}")] - RlpDecodeError(String), - #[error("{0}")] - Other(String), -} - impl FromStr for DnsEntry { type Err = ParseDnsEntryError; @@ -87,12 +75,12 @@ impl FromStr for DnsEntry { } /// Represents an `enr-root` hash of subtrees containing nodes and links. -#[derive(Clone)] +#[derive(Clone, Eq, PartialEq)] pub struct TreeRootEntry { - enr_root: String, - link_root: String, - sequence_number: u64, - signature: Bytes, + pub enr_root: String, + pub link_root: String, + pub sequence_number: u64, + pub signature: Bytes, } // === impl TreeRootEntry === @@ -101,7 +89,7 @@ impl TreeRootEntry { /// Parses the entry from text. /// /// Caution: This assumes the prefix is already removed. - fn parse_value(mut input: &str) -> Result { + fn parse_value(mut input: &str) -> ParseEntryResult { let input = &mut input; let enr_root = parse_value(input, "e=", "ENR Root", |s| Ok(s.to_string()))?; let link_root = parse_value(input, "l=", "Link Root", |s| Ok(s.to_string()))?; @@ -132,6 +120,13 @@ impl TreeRootEntry { ) } + /// Signs the content with the given key + pub fn sign(&mut self, key: &K) -> Result<(), EnrError> { + let sig = key.sign_v4(self.content().as_bytes()).map_err(|_| EnrError::SigningError)?; + self.signature = sig.into(); + Ok(()) + } + /// Verify the signature of the record. #[must_use] pub fn verify(&self, pubkey: &K::PublicKey) -> bool { @@ -173,7 +168,7 @@ impl fmt::Display for TreeRootEntry { /// A branch entry with base32 hashes #[derive(Debug, Clone)] pub struct BranchEntry { - children: Vec, + pub children: Vec, } // === impl BranchEntry === @@ -182,7 +177,7 @@ impl BranchEntry { /// Parses the entry from text. /// /// Caution: This assumes the prefix is already removed. - fn parse_value(input: &str) -> Result { + fn parse_value(input: &str) -> ParseEntryResult { let children = input.trim().split(',').map(str::to_string).collect(); Ok(Self { children }) } @@ -209,8 +204,8 @@ impl fmt::Display for BranchEntry { /// A link entry #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub struct LinkEntry { - domain: String, - pubkey: K::PublicKey, + pub domain: String, + pub pubkey: K::PublicKey, } // === impl LinkEntry === @@ -219,7 +214,7 @@ impl LinkEntry { /// Parses the entry from text. /// /// Caution: This assumes the prefix is already removed. - fn parse_value(input: &str) -> Result { + fn parse_value(input: &str) -> ParseEntryResult { let (pubkey, domain) = input.split_once('@').ok_or_else(|| { ParseDnsEntryError::Other(format!("Missing @ delimiter in Link entry: {input}")) })?; @@ -259,7 +254,7 @@ impl fmt::Display for LinkEntry { /// The actual [Enr] entry. #[derive(Debug, Clone)] pub struct NodeEntry { - enr: Enr, + pub enr: Enr, } // === impl NodeEntry === @@ -268,7 +263,7 @@ impl NodeEntry { /// Parses the entry from text. /// /// Caution: This assumes the prefix is already removed. - fn parse_value(s: &str) -> Result { + fn parse_value(s: &str) -> ParseEntryResult { let enr: Enr = s.parse().map_err(ParseDnsEntryError::Other)?; Ok(Self { enr }) } @@ -293,14 +288,9 @@ impl fmt::Display for NodeEntry { } /// Parses the value of the key value pair -fn parse_value( - input: &mut &str, - key: &str, - err: &'static str, - f: F, -) -> Result +fn parse_value(input: &mut &str, key: &str, err: &'static str, f: F) -> ParseEntryResult where - F: Fn(&str) -> Result, + F: Fn(&str) -> ParseEntryResult, { ensure_strip_key(input, key, err)?; let val = input.split_whitespace().next().ok_or(FieldNotFound(err))?; @@ -312,11 +302,7 @@ where /// Strips the `key` from the `input` /// /// Returns an err if the `input` does not start with the `key` -fn ensure_strip_key( - input: &mut &str, - key: &str, - err: &'static str, -) -> Result<(), ParseDnsEntryError> { +fn ensure_strip_key(input: &mut &str, key: &str, err: &'static str) -> ParseEntryResult<()> { *input = input.trim_start().strip_prefix(key).ok_or(FieldNotFound(err))?; Ok(()) } diff --git a/crates/primitives/src/chain.rs b/crates/primitives/src/chain.rs index 99a9e015e..43c167b23 100644 --- a/crates/primitives/src/chain.rs +++ b/crates/primitives/src/chain.rs @@ -14,6 +14,21 @@ pub enum Chain { } impl Chain { + /// Returns the mainnet chain. + pub const fn mainnet() -> Self { + Chain::Named(ethers_core::types::Chain::Mainnet) + } + + /// Returns the goerli chain. + pub const fn goerli() -> Self { + Chain::Named(ethers_core::types::Chain::Goerli) + } + + /// Returns the sepolia chain. + pub const fn sepolia() -> Self { + Chain::Named(ethers_core::types::Chain::Sepolia) + } + /// The id of the chain pub fn id(&self) -> u64 { match self { @@ -30,6 +45,21 @@ impl Chain { Chain::Id(_) => false, } } + + /// Returns the address of the public DNS node list for the given chain. + /// + /// See also + pub fn public_dns_network_protocol(self) -> Option { + use ethers_core::types::Chain::*; + const DNS_PREFIX: &str = "enrtree://AKA3AM6LPBYEUDMVNU3BSVQJ5AD45Y7YPOHJLEF6W26QOE4VTUDPE@"; + + let named: ethers_core::types::Chain = self.try_into().ok()?; + + if matches!(named, Mainnet | Goerli | Sepolia | Ropsten | Rinkeby) { + return Some(format!("{DNS_PREFIX}all.{}.ethdisco.net", named.as_ref().to_lowercase())) + } + None + } } impl fmt::Display for Chain { @@ -251,4 +281,11 @@ mod tests { assert_eq!(chain.length(), 3); } + + #[test] + fn test_dns_network() { + let s = "enrtree://AKA3AM6LPBYEUDMVNU3BSVQJ5AD45Y7YPOHJLEF6W26QOE4VTUDPE@all.mainnet.ethdisco.net"; + let chain: Chain = ethers_core::types::Chain::Mainnet.into(); + assert_eq!(s, chain.public_dns_network_protocol().unwrap().as_str()); + } }