feat(disc): add DnsDiscovery service (#794)

This commit is contained in:
Matthias Seitz
2023-01-12 12:49:02 +01:00
committed by GitHub
parent 23984e3db7
commit 09c5a04c7b
10 changed files with 1119 additions and 110 deletions

16
Cargo.lock generated
View File

@ -1103,7 +1103,7 @@ dependencies = [
"hex",
"hkdf",
"lazy_static",
"lru",
"lru 0.7.8",
"more-asserts",
"parking_lot 0.11.2",
"rand 0.8.5",
@ -2556,6 +2556,15 @@ dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "lru"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71e7d46de488603ffdd5f30afbc64fbba2378214a2c3a2fb83abf3d33126df17"
dependencies = [
"hashbrown 0.13.1",
]
[[package]]
name = "lru-cache"
version = "0.1.2"
@ -3734,7 +3743,12 @@ dependencies = [
"bytes",
"data-encoding",
"enr 0.7.0",
"linked_hash_set",
"lru 0.9.0",
"parking_lot 0.12.1",
"reth-net-common",
"reth-primitives",
"reth-tracing",
"secp256k1 0.24.2",
"thiserror",
"tokio",

View File

@ -10,6 +10,7 @@ description = "Support for EIP-1459 Node Discovery via DNS"
[dependencies]
# reth
reth-primitives = { path = "../../primitives" }
reth-net-common = { path = "../common" }
# ethereum
secp256k1 = { version = "0.24", features = [
@ -28,7 +29,15 @@ trust-dns-resolver = "0.22"
# misc
data-encoding = "2"
async-trait = "0.1"
bytes = "1.2"
tracing = "0.1"
linked_hash_set = "0.1"
lru = "0.9"
thiserror = "1.0"
async-trait = "0.1.61"
tracing = "0.1"
parking_lot = "0.12"
[dev-dependencies]
tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread"] }
reth-tracing = { path = "../../tracing" }

View File

@ -1,4 +1,4 @@
use std::time::Duration;
use std::{num::NonZeroUsize, time::Duration};
/// Settings for the [DnsDiscoveryClient](crate::DnsDiscoveryClient).
#[derive(Debug, Clone)]
@ -7,17 +7,25 @@ pub struct DnsDiscoveryConfig {
///
/// Default: 5s
pub lookup_timeout: Duration,
/// The rate at which lookups should be re-triggered.
/// The DNS request rate limit
///
/// Default: 3
pub max_requests_per_sec: NonZeroUsize,
/// The rate at which trees should be updated.
///
/// Default: 30min
pub lookup_interval: Duration,
pub recheck_interval: Duration,
/// Maximum number of cached DNS records.
pub dns_record_cache_limit: NonZeroUsize,
}
impl Default for DnsDiscoveryConfig {
fn default() -> Self {
Self {
lookup_timeout: Duration::from_secs(5),
lookup_interval: Duration::from_secs(60 * 30),
max_requests_per_sec: NonZeroUsize::new(3).unwrap(),
recheck_interval: Duration::from_secs(60 * 30),
dns_record_cache_limit: NonZeroUsize::new(1_000).unwrap(),
}
}
}

View File

@ -0,0 +1,38 @@
use crate::tree::TreeRootEntry;
/// Alias for a parse result
pub(crate) type ParseEntryResult<T> = Result<T, ParseDnsEntryError>;
pub(crate) type LookupResult<T> = Result<T, LookupError>;
/// Error while parsing a [DnsEntry]
#[derive(thiserror::Error, Debug)]
#[allow(missing_docs)]
pub enum ParseDnsEntryError {
#[error("Unknown entry: {0}")]
UnknownEntry(String),
#[error("Field {0} not found.")]
FieldNotFound(&'static str),
#[error("Base64 decoding failed: {0}")]
Base64DecodeError(String),
#[error("Base32 decoding failed: {0}")]
Base32DecodeError(String),
#[error("{0}")]
RlpDecodeError(String),
#[error("{0}")]
Other(String),
}
/// Errors that can happen during lookups
#[derive(thiserror::Error, Debug)]
#[allow(missing_docs)]
pub(crate) enum LookupError {
#[error(transparent)]
Parse(#[from] ParseDnsEntryError),
#[error("Failed to verify root {0}")]
InvalidRoot(TreeRootEntry),
#[error("Request timed out")]
RequestTimedOut,
#[error("Entry not found")]
EntryNotFound,
}

View File

@ -4,30 +4,48 @@
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
// TODO rm later
#![allow(missing_docs, unreachable_pub, unused)]
//! Implementation of [EIP-1459](https://eips.ethereum.org/EIPS/eip-1459) Node Discovery via DNS.
use std::{
collections::HashMap,
sync::Arc,
task::{Context, Poll},
pub use crate::resolver::{DnsResolver, MapResolver, Resolver};
use crate::{
query::{QueryOutcome, QueryPool, ResolveEntryResult, ResolveRootResult},
sync::{ResolveKind, SyncAction},
tree::{DnsEntry, LinkEntry},
};
use tokio::sync::{mpsc, mpsc::UnboundedSender};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
pub use config::DnsDiscoveryConfig;
use enr::Enr;
use error::ParseDnsEntryError;
use lru::LruCache;
use reth_primitives::{NodeRecord, PeerId};
use secp256k1::SecretKey;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
net::IpAddr,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
time::{Duration, Instant},
};
use sync::SyncTree;
use tokio::sync::{
mpsc,
mpsc::{error::TrySendError, UnboundedSender},
oneshot,
};
use tokio_stream::{
wrappers::{ReceiverStream, UnboundedReceiverStream},
Stream,
};
use tracing::{debug, warn};
mod config;
mod error;
mod query;
pub mod resolver;
mod sync;
pub mod tree;
use crate::{
sync::SyncTree,
tree::{LinkEntry, ParseDnsEntryError},
};
pub use config::DnsDiscoveryConfig;
/// [DnsDiscoveryService] front-end.
#[derive(Clone)]
pub struct DnsDiscoveryHandle {
@ -37,33 +55,88 @@ pub struct DnsDiscoveryHandle {
// === impl DnsDiscovery ===
impl DnsDiscoveryHandle {}
impl DnsDiscoveryHandle {
/// Starts syncing the given link to a tree.
pub fn sync_tree(&mut self, link: &str) -> Result<(), ParseDnsEntryError> {
self.sync_tree_with_link(link.parse()?);
Ok(())
}
/// Starts syncing the given link to a tree.
pub fn sync_tree_with_link(&mut self, link: LinkEntry) {
let _ = self.to_service.send(DnsDiscoveryCommand::SyncTree(link));
}
/// Returns the receiver half of new listener channel that streams discovered [`NodeRecord`]s.
pub async fn node_record_stream(
&self,
) -> Result<ReceiverStream<NodeRecord>, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
let cmd = DnsDiscoveryCommand::NodeRecordUpdates(tx);
let _ = self.to_service.send(cmd);
rx.await
}
}
/// A client that discovers nodes via DNS.
#[must_use = "Service does nothing unless polled"]
pub struct DnsDiscoveryService {
pub struct DnsDiscoveryService<R: Resolver = DnsResolver> {
/// Copy of the sender half, so new [`DnsDiscoveryHandle`] can be created on demand.
command_tx: UnboundedSender<DnsDiscoveryCommand>,
/// Receiver half of the command channel.
command_rx: UnboundedReceiverStream<DnsDiscoveryCommand>,
/// All subscribers for event updates.
event_listener: Vec<mpsc::Sender<DnsDiscoveryEvent>>,
/// All subscribers for resolved [NodeRecord]s.
node_record_listeners: Vec<mpsc::Sender<NodeRecord>>,
/// All the trees that can be synced.
trees: HashMap<Arc<LinkEntry>, SyncTree>,
trees: HashMap<LinkEntry, SyncTree>,
/// All queries currently in progress
queries: QueryPool<R, SecretKey>,
/// Cached dns records
dns_record_cache: LruCache<String, DnsEntry<SecretKey>>,
/// all buffered events
queued_events: VecDeque<DnsDiscoveryEvent>,
/// The rate at which trees should be updated.
recheck_interval: Duration,
}
// === impl DnsDiscoveryService ===
impl DnsDiscoveryService {
impl<R: Resolver> DnsDiscoveryService<R> {
/// Creates a new instance of the [DnsDiscoveryService] using the given settings.
pub fn new(_config: DnsDiscoveryConfig) -> Self {
todo!()
///
/// ```
/// use std::sync::Arc;
/// use reth_dns_discovery::{DnsDiscoveryService, DnsResolver};
/// # fn t() {
/// let service =
/// DnsDiscoveryService::new(Arc::new(DnsResolver::from_system_conf().unwrap()), Default::default());
/// # }
/// ```
pub fn new(resolver: Arc<R>, config: DnsDiscoveryConfig) -> Self {
let DnsDiscoveryConfig {
lookup_timeout,
max_requests_per_sec,
recheck_interval,
dns_record_cache_limit,
} = config;
let queries = QueryPool::new(resolver, max_requests_per_sec, lookup_timeout);
let (command_tx, command_rx) = mpsc::unbounded_channel();
Self {
command_tx,
command_rx: UnboundedReceiverStream::new(command_rx),
node_record_listeners: Default::default(),
trees: Default::default(),
queries,
dns_record_cache: LruCache::new(dns_record_cache_limit),
queued_events: Default::default(),
recheck_interval,
}
}
/// Same as [DnsDiscoveryService::new] but also returns a new handle that's connected to the
/// service
pub fn new_pair(config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) {
let service = Self::new(config);
pub fn new_pair(resolver: Arc<R>, config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) {
let service = Self::new(resolver, config);
let handle = service.handle();
(service, handle)
}
@ -73,38 +146,368 @@ impl DnsDiscoveryService {
DnsDiscoveryHandle { to_service: self.command_tx.clone() }
}
/// Creates a new channel for [`DiscoveryUpdate`]s.
pub fn event_listener(&mut self) -> ReceiverStream<DnsDiscoveryEvent> {
/// Creates a new channel for [`NodeRecord`]s.
pub fn node_record_stream(&mut self) -> ReceiverStream<NodeRecord> {
let (tx, rx) = mpsc::channel(256);
self.event_listener.push(tx);
self.node_record_listeners.push(tx);
ReceiverStream::new(rx)
}
/// Sends the event to all listeners.
///
/// Remove channels that got closed.
fn notify(&mut self, event: DnsDiscoveryEvent) {
self.event_listener.retain(|listener| listener.try_send(event.clone()).is_ok());
fn notify(&mut self, record: NodeRecord) {
self.node_record_listeners.retain_mut(|listener| match listener.try_send(record) {
Ok(()) => true,
Err(err) => match err {
TrySendError::Full(_) => true,
TrySendError::Closed(_) => false,
},
});
}
/// Starts syncing the given link to a tree.
pub fn sync_tree(&mut self, link: &str) -> Result<(), ParseDnsEntryError> {
let _link: LinkEntry = link.parse()?;
self.sync_tree_with_link(link.parse()?);
Ok(())
}
/// Starts syncing the given link to a tree.
pub fn sync_tree_with_link(&mut self, link: LinkEntry) {
self.queries.resolve_root(link);
}
/// Resolves an entry
fn resolve_entry(&mut self, _domain: impl Into<String>, _hash: impl Into<String>) {}
fn resolve_entry(&mut self, link: LinkEntry<SecretKey>, hash: String, kind: ResolveKind) {
if let Some(entry) = self.dns_record_cache.get(&hash).cloned() {
// already resolved
let cached = ResolveEntryResult { entry: Some(Ok(entry)), link, hash, kind };
self.on_resolved_entry(cached);
return
}
self.queries.resolve_entry(link, hash, kind)
}
fn on_resolved_root(&mut self, resp: ResolveRootResult<SecretKey>) {
match resp {
Ok((root, link)) => match self.trees.entry(link.clone()) {
Entry::Occupied(mut entry) => {
entry.get_mut().update_root(root);
}
Entry::Vacant(entry) => {
entry.insert(SyncTree::new(root, link));
}
},
Err((err, link)) => {
debug!(target: "disc::dns",?err, ?link, "Failed to lookup root")
}
}
}
fn on_resolved_enr(&mut self, enr: Enr<SecretKey>) {
if let Some(record) = convert_enr_node_record(&enr) {
self.notify(record);
}
self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr))
}
fn on_resolved_entry(&mut self, resp: ResolveEntryResult<SecretKey>) {
let ResolveEntryResult { entry, link, hash, kind } = resp;
match entry {
Some(Err(err)) => {
debug!(target: "disc::dns",?err, domain=%link.domain, ?hash, "Failed to lookup entry")
}
None => {
debug!(target: "disc::dns",domain=%link.domain, ?hash, "No dns entry")
}
Some(Ok(entry)) => {
// cache entry
self.dns_record_cache.push(hash.clone(), entry.clone());
match entry {
DnsEntry::Root(root) => {
debug!(target: "disc::dns",%root, domain=%link.domain, ?hash, "resolved unexpected root entry");
}
DnsEntry::Link(link_entry) => {
if kind.is_link() {
if let Some(tree) = self.trees.get_mut(&link) {
tree.resolved_links_mut().insert(hash, link_entry.clone());
}
self.sync_tree_with_link(link_entry)
} else {
debug!(target: "disc::dns",%link_entry, domain=%link.domain, ?hash, "resolved unexpected Link entry");
}
}
DnsEntry::Branch(branch_entry) => {
if let Some(tree) = self.trees.get_mut(&link) {
tree.extend_children(kind, branch_entry.children)
}
}
DnsEntry::Node(entry) => {
if kind.is_link() {
debug!(target: "disc::dns",domain=%link.domain, ?hash, "resolved unexpected enr entry");
} else {
self.on_resolved_enr(entry.enr)
}
}
}
}
}
}
/// Advances the state of the DNS discovery service by polling,triggering lookups
pub(crate) fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<()> {
Poll::Pending
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DnsDiscoveryEvent> {
loop {
// drain buffered events first
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event)
}
// process all incoming commands
while let Poll::Ready(Some(cmd)) = Pin::new(&mut self.command_rx).poll_next(cx) {
match cmd {
DnsDiscoveryCommand::SyncTree(link) => {
self.sync_tree_with_link(link);
}
DnsDiscoveryCommand::NodeRecordUpdates(tx) => {
let _ = tx.send(self.node_record_stream());
}
}
}
enum DnsDiscoveryCommand {}
while let Poll::Ready(outcome) = self.queries.poll(cx) {
// handle query outcome
match outcome {
QueryOutcome::Root(resp) => self.on_resolved_root(resp),
QueryOutcome::Entry(resp) => self.on_resolved_entry(resp),
}
}
let mut progress = false;
let now = Instant::now();
let mut pending_resolves = Vec::new();
let mut pending_updates = Vec::new();
for tree in self.trees.values_mut() {
while let Some(action) = tree.poll(now, self.recheck_interval) {
progress = true;
match action {
SyncAction::UpdateRoot => {
pending_updates.push(tree.link().clone());
}
SyncAction::Enr(hash) => {
pending_resolves.push((tree.link().clone(), hash, ResolveKind::Enr));
}
SyncAction::Link(hash) => {
pending_resolves.push((tree.link().clone(), hash, ResolveKind::Link));
}
}
}
}
for (domain, hash, kind) in pending_resolves {
self.resolve_entry(domain, hash, kind)
}
for link in pending_updates {
self.sync_tree_with_link(link)
}
if !progress && self.queued_events.is_empty() {
return Poll::Pending
}
}
}
}
/// A Stream events, mainly used for debugging
impl<R: Resolver> Stream for DnsDiscoveryService<R> {
type Item = DnsDiscoveryEvent;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(Some(ready!(self.get_mut().poll(cx))))
}
}
/// Commands sent from [DnsDiscoveryHandle] to [DnsDiscoveryService]
enum DnsDiscoveryCommand {
/// Sync a tree
SyncTree(LinkEntry),
NodeRecordUpdates(oneshot::Sender<ReceiverStream<NodeRecord>>),
}
/// Represents dns discovery related update events.
#[derive(Debug, Clone)]
pub enum DnsDiscoveryEvent {}
pub enum DnsDiscoveryEvent {
/// Resolved an Enr entry via DNS.
Enr(Enr<SecretKey>),
}
/// Converts an [Enr] into a [NodeRecord]
fn convert_enr_node_record(enr: &Enr<SecretKey>) -> Option<NodeRecord> {
let record = NodeRecord {
address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?,
tcp_port: enr.tcp4().or_else(|| enr.tcp6())?,
udp_port: enr.udp4().or_else(|| enr.udp6())?,
id: PeerId::from_slice(&enr.public_key().serialize_uncompressed()[1..]),
}
.into_ipv4_mapped();
Some(record)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tree::TreeRootEntry;
use enr::{EnrBuilder, EnrKey};
use reth_primitives::Chain;
use secp256k1::rand::thread_rng;
use std::{future::poll_fn, net::Ipv4Addr};
use tokio_stream::StreamExt;
#[tokio::test]
async fn test_start_root_sync() {
reth_tracing::init_test_tracing();
let secret_key = SecretKey::new(&mut thread_rng());
let resolver = MapResolver::default();
let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
let mut root: TreeRootEntry = s.parse().unwrap();
root.sign(&secret_key).unwrap();
let link =
LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
resolver.insert(link.domain.clone(), root.to_string());
let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
service.sync_tree_with_link(link.clone());
poll_fn(|cx| {
let _ = service.poll(cx);
Poll::Ready(())
})
.await;
let tree = service.trees.get(&link).unwrap();
assert_eq!(tree.root().clone(), root);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_get_node() {
reth_tracing::init_test_tracing();
let secret_key = SecretKey::new(&mut thread_rng());
let resolver = MapResolver::default();
let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
let mut root: TreeRootEntry = s.parse().unwrap();
root.sign(&secret_key).unwrap();
let link =
LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
resolver.insert(link.domain.clone(), root.to_string());
let mut builder = EnrBuilder::new("v4");
builder.ip4(Ipv4Addr::LOCALHOST).udp4(30303).tcp4(30303);
let enr = builder.build(&secret_key).unwrap();
resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64());
let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
let mut node_records = service.node_record_stream();
let task = tokio::task::spawn(async move {
let _ = node_records.next().await.unwrap();
});
service.sync_tree_with_link(link.clone());
let event = poll_fn(|cx| service.poll(cx)).await;
match event {
DnsDiscoveryEvent::Enr(discovered) => {
assert_eq!(discovered, enr);
}
}
poll_fn(|cx| {
assert!(service.poll(cx).is_pending());
Poll::Ready(())
})
.await;
task.await.unwrap();
}
#[tokio::test]
async fn test_recheck_tree() {
reth_tracing::init_test_tracing();
let config = DnsDiscoveryConfig {
recheck_interval: Duration::from_millis(750),
..Default::default()
};
let secret_key = SecretKey::new(&mut thread_rng());
let resolver = Arc::new(MapResolver::default());
let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
let mut root: TreeRootEntry = s.parse().unwrap();
root.sign(&secret_key).unwrap();
let link =
LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
resolver.insert(link.domain.clone(), root.to_string());
let mut service = DnsDiscoveryService::new(Arc::clone(&resolver), config.clone());
service.sync_tree_with_link(link.clone());
poll_fn(|cx| {
assert!(service.poll(cx).is_pending());
Poll::Ready(())
})
.await;
// await recheck timeout
tokio::time::sleep(config.recheck_interval).await;
let enr = EnrBuilder::new("v4").build(&secret_key).unwrap();
resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64());
let event = poll_fn(|cx| service.poll(cx)).await;
match event {
DnsDiscoveryEvent::Enr(discovered) => {
assert_eq!(discovered, enr);
}
}
poll_fn(|cx| {
assert!(service.poll(cx).is_pending());
Poll::Ready(())
})
.await;
}
#[tokio::test]
#[ignore]
async fn test_dns_resolver() {
reth_tracing::init_test_tracing();
let mut service = DnsDiscoveryService::new(
Arc::new(DnsResolver::from_system_conf().unwrap()),
Default::default(),
);
service.sync_tree(&Chain::mainnet().public_dns_network_protocol().unwrap()).unwrap();
while let Some(event) = service.next().await {
match event {
DnsDiscoveryEvent::Enr(enr) => {
println!("discovered enr {}", enr.to_base64());
}
}
}
}
}

281
crates/net/dns/src/query.rs Normal file
View File

@ -0,0 +1,281 @@
//! Handles query execution
use crate::{
error::{LookupError, LookupResult},
resolver::Resolver,
sync::ResolveKind,
tree::{DnsEntry, LinkEntry, TreeRootEntry},
};
use enr::EnrKeyUnambiguous;
use reth_net_common::ratelimit::{Rate, RateLimit};
use std::{
collections::VecDeque,
future::Future,
num::NonZeroUsize,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
time::Duration,
};
/// The `QueryPool` provides an aggregate state machine for driving queries to completion.
pub(crate) struct QueryPool<R: Resolver, K: EnrKeyUnambiguous> {
/// The [Resolver] that's used to lookup queries.
resolver: Arc<R>,
/// Buffered queries
queued_queries: VecDeque<Query<K>>,
/// All active queries
active_queries: Vec<Query<K>>,
/// buffered results
queued_outcomes: VecDeque<QueryOutcome<K>>,
/// Rate limit for DNS requests
rate_limit: RateLimit,
/// Timeout for DNS lookups.
lookup_timeout: Duration,
}
// === impl QueryPool ===
impl<R: Resolver, K: EnrKeyUnambiguous> QueryPool<R, K> {
pub(crate) fn new(
resolver: Arc<R>,
max_requests_per_sec: NonZeroUsize,
lookup_timeout: Duration,
) -> Self {
Self {
resolver,
queued_queries: Default::default(),
active_queries: vec![],
queued_outcomes: Default::default(),
rate_limit: RateLimit::new(Rate::new(
max_requests_per_sec.get() as u64,
Duration::from_secs(1),
)),
lookup_timeout,
}
}
/// Resolves the root the link's domain references
pub(crate) fn resolve_root(&mut self, link: LinkEntry<K>) {
let resolver = Arc::clone(&self.resolver);
let timeout = self.lookup_timeout;
self.queued_queries.push_back(Query::Root(Box::pin(async move {
resolve_root(resolver, link, timeout).await
})))
}
/// Resolves the [DnsEntry] for `<hash.domain>`
pub(crate) fn resolve_entry(&mut self, link: LinkEntry<K>, hash: String, kind: ResolveKind) {
let resolver = Arc::clone(&self.resolver);
let timeout = self.lookup_timeout;
self.queued_queries.push_back(Query::Entry(Box::pin(async move {
resolve_entry(resolver, link, hash, kind, timeout).await
})))
}
/// Advances the state of the queries
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<QueryOutcome<K>> {
loop {
// drain buffered events first
if let Some(event) = self.queued_outcomes.pop_front() {
return Poll::Ready(event)
}
// queue in new queries
'queries: loop {
if self.rate_limit.poll_ready(cx).is_ready() {
if let Some(query) = self.queued_queries.pop_front() {
self.rate_limit.tick();
self.active_queries.push(query);
continue 'queries
}
}
break
}
// advance all queries
for idx in (0..self.active_queries.len()).rev() {
let mut query = self.active_queries.swap_remove(idx);
if let Poll::Ready(outcome) = query.poll(cx) {
self.queued_outcomes.push_back(outcome);
} else {
// still pending
self.active_queries.push(query);
}
}
if self.queued_outcomes.is_empty() {
return Poll::Pending
}
}
}
}
// === Various future/type alias ===
pub(crate) struct ResolveEntryResult<K: EnrKeyUnambiguous> {
pub(crate) entry: Option<LookupResult<DnsEntry<K>>>,
pub(crate) link: LinkEntry<K>,
pub(crate) hash: String,
pub(crate) kind: ResolveKind,
}
pub(crate) type ResolveRootResult<K> =
Result<(TreeRootEntry, LinkEntry<K>), (LookupError, LinkEntry<K>)>;
type ResolveRootFuture<K> = Pin<Box<dyn Future<Output = ResolveRootResult<K>> + Send>>;
type ResolveEntryFuture<K> = Pin<Box<dyn Future<Output = ResolveEntryResult<K>> + Send>>;
enum Query<K: EnrKeyUnambiguous> {
Root(ResolveRootFuture<K>),
Entry(ResolveEntryFuture<K>),
}
// === impl Query ===
impl<K: EnrKeyUnambiguous> Query<K> {
/// Advances the query
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<QueryOutcome<K>> {
match self {
Query::Root(ref mut query) => {
let outcome = ready!(query.as_mut().poll(cx));
Poll::Ready(QueryOutcome::Root(outcome))
}
Query::Entry(ref mut query) => {
let outcome = ready!(query.as_mut().poll(cx));
Poll::Ready(QueryOutcome::Entry(outcome))
}
}
}
}
/// The output the queries return
pub(crate) enum QueryOutcome<K: EnrKeyUnambiguous> {
Root(ResolveRootResult<K>),
Entry(ResolveEntryResult<K>),
}
/// Retrieves the [DnsEntry]
async fn resolve_entry<K: EnrKeyUnambiguous, R: Resolver>(
resolver: Arc<R>,
link: LinkEntry<K>,
hash: String,
kind: ResolveKind,
timeout: Duration,
) -> ResolveEntryResult<K> {
let fqn = format!("{hash}.{}", link.domain);
let mut resp = ResolveEntryResult { entry: None, link, hash, kind };
match lookup_with_timeout::<R>(&resolver, &fqn, timeout).await {
Ok(Some(entry)) => {
resp.entry = Some(entry.parse::<DnsEntry<K>>().map_err(|err| err.into()))
}
Err(err) => resp.entry = Some(Err(err)),
Ok(None) => {}
}
resp
}
/// Retrieves the root entry the link points to and returns the verified entry
///
/// Returns an error if the record could be retrieved but is not a root entry or failed to be
/// verified.
async fn resolve_root<K: EnrKeyUnambiguous, R: Resolver>(
resolver: Arc<R>,
link: LinkEntry<K>,
timeout: Duration,
) -> ResolveRootResult<K> {
let root = match lookup_with_timeout::<R>(&resolver, &link.domain, timeout).await {
Ok(Some(root)) => root,
Ok(_) => return Err((LookupError::EntryNotFound, link)),
Err(err) => return Err((err, link)),
};
match root.parse::<TreeRootEntry>() {
Ok(root) => {
if root.verify::<K>(&link.pubkey) {
Ok((root, link))
} else {
Err((LookupError::InvalidRoot(root), link))
}
}
Err(err) => Err((err.into(), link)),
}
}
async fn lookup_with_timeout<R: Resolver>(
r: &R,
query: &str,
timeout: Duration,
) -> LookupResult<Option<String>> {
match tokio::time::timeout(timeout, r.lookup_txt(query)).await {
Ok(res) => Ok(res),
Err(_) => Err(LookupError::RequestTimedOut),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{resolver::TimeoutResolver, DnsDiscoveryConfig, MapResolver};
use std::future::poll_fn;
#[tokio::test]
async fn test_rate_limit() {
let resolver = Arc::new(MapResolver::default());
let config = DnsDiscoveryConfig::default();
let mut pool = QueryPool::new(resolver, config.max_requests_per_sec, config.lookup_timeout);
let s = "enrtree://AM5FCQLWIZX2QFPNJAP7VUERCCRNGRHWZG3YYHIUV7BVDQ5FDPRT2@nodes.example.org";
let entry: LinkEntry = s.parse().unwrap();
for _n in 0..config.max_requests_per_sec.get() {
poll_fn(|cx| {
pool.resolve_root(entry.clone());
assert_eq!(pool.queued_queries.len(), 1);
assert!(pool.rate_limit.poll_ready(cx).is_ready());
let _ = pool.poll(cx);
assert_eq!(pool.queued_queries.len(), 0);
Poll::Ready(())
})
.await;
}
pool.resolve_root(entry.clone());
assert_eq!(pool.queued_queries.len(), 1);
poll_fn(|cx| {
assert!(pool.rate_limit.poll_ready(cx).is_pending());
let _ = pool.poll(cx);
assert_eq!(pool.queued_queries.len(), 1);
Poll::Ready(())
})
.await;
}
#[tokio::test]
async fn test_timeouts() {
let config =
DnsDiscoveryConfig { lookup_timeout: Duration::from_millis(500), ..Default::default() };
let resolver = Arc::new(TimeoutResolver(config.lookup_timeout * 2));
let mut pool = QueryPool::new(resolver, config.max_requests_per_sec, config.lookup_timeout);
let s = "enrtree://AM5FCQLWIZX2QFPNJAP7VUERCCRNGRHWZG3YYHIUV7BVDQ5FDPRT2@nodes.example.org";
let entry: LinkEntry = s.parse().unwrap();
pool.resolve_root(entry);
let outcome = poll_fn(|cx| pool.poll(cx)).await;
match outcome {
QueryOutcome::Root(res) => {
let res = res.unwrap_err().0;
match res {
LookupError::RequestTimedOut => {}
_ => unreachable!(),
}
}
QueryOutcome::Entry(_) => {
unreachable!()
}
}
}
}

View File

@ -1,39 +1,121 @@
//! Perform DNS lookups
use async_trait::async_trait;
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
use parking_lot::RwLock;
use std::collections::HashMap;
use tracing::trace;
pub use trust_dns_resolver::TokioAsyncResolver;
use trust_dns_resolver::{
error::ResolveError, proto::DnsHandle, AsyncResolver, ConnectionProvider,
};
/// A type that can lookup DNS entries
#[async_trait]
pub trait Resolver: Send + Sync {
/// Performs a textual lookup.
pub trait Resolver: Send + Sync + Unpin + 'static {
/// Performs a textual lookup and returns the first text
async fn lookup_txt(&self, query: &str) -> Option<String>;
}
#[async_trait]
impl<C, P> Resolver for AsyncResolver<C, P>
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
{
async fn lookup_txt(&self, query: &str) -> Option<String> {
// See: [AsyncResolver::txt_lookup]
// > *hint* queries that end with a '.' are fully qualified names and are cheaper lookups
let fqn = if query.ends_with('.') { query.to_string() } else { format!("{query}.") };
match self.txt_lookup(fqn).await {
Err(err) => {
trace!(target: "disc::dns", ?err, ?query, "dns lookup failed");
None
}
Ok(lookup) => {
let txt = lookup.into_iter().next()?;
let entry = txt.iter().next()?;
String::from_utf8(entry.to_vec()).ok()
}
}
}
}
/// An asynchronous DNS resolver
///
/// See also [TokioAsyncResolver](trust_dns_resolver::TokioAsyncResolver)
///
/// ```
/// # fn t() {
/// use reth_dns_discovery::resolver::DnsResolver;
/// let resolver = DnsResolver::from_system_conf().unwrap();
/// # }
/// ```
#[derive(Clone)]
pub struct DnsResolver(TokioAsyncResolver);
// === impl DnsResolver ===
impl DnsResolver {
/// Create a new resolver by wrapping the given [AsyncResolver]
pub fn new(resolver: TokioAsyncResolver) -> Self {
Self(resolver)
}
/// Constructs a new Tokio based Resolver with the system configuration.
///
/// This will use `/etc/resolv.conf` on Unix OSes and the registry on Windows.
pub fn from_system_conf() -> Result<Self, ResolveError> {
TokioAsyncResolver::tokio_from_system_conf().map(Self::new)
}
}
#[async_trait]
impl Resolver for DnsResolver {
async fn lookup_txt(&self, query: &str) -> Option<String> {
Resolver::lookup_txt(&self.0, query).await
}
}
/// A [Resolver] that uses an in memory map to lookup entries
#[derive(Debug, Clone)]
pub struct MapResolver(HashMap<String, String>);
#[derive(Debug, Default)]
pub struct MapResolver(RwLock<HashMap<String, String>>);
impl Deref for MapResolver {
type Target = HashMap<String, String>;
// === impl MapResolver ===
fn deref(&self) -> &Self::Target {
&self.0
}
impl MapResolver {
/// Inserts a key-value pair into the map.
pub fn insert(&self, k: String, v: String) -> Option<String> {
self.0.write().insert(k, v)
}
impl DerefMut for MapResolver {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
/// Returns the value corresponding to the key
pub fn get(&self, k: &str) -> Option<String> {
self.0.read().get(k).cloned()
}
/// Removes a key from the map, returning the value at the key if the key was previously in the
/// map.
pub fn remove(&self, k: &str) -> Option<String> {
self.0.write().remove(k)
}
}
#[async_trait]
impl Resolver for MapResolver {
async fn lookup_txt(&self, query: &str) -> Option<String> {
self.get(query).cloned()
self.get(query)
}
}
/// A Resolver that always times out.
#[cfg(test)]
pub(crate) struct TimeoutResolver(pub(crate) std::time::Duration);
#[cfg(test)]
#[async_trait]
impl Resolver for TimeoutResolver {
async fn lookup_txt(&self, _query: &str) -> Option<String> {
tokio::time::sleep(self.0).await;
None
}
}

View File

@ -1,11 +1,162 @@
//! Sync trees
use crate::tree::LinkEntry;
use crate::tree::{LinkEntry, TreeRootEntry};
use enr::EnrKeyUnambiguous;
use linked_hash_set::LinkedHashSet;
use secp256k1::SecretKey;
use std::{
collections::HashMap,
time::{Duration, Instant},
};
/// A sync-able tree
pub(crate) struct SyncTree<K: EnrKeyUnambiguous = SecretKey> {
/// The link to this tree.
/// Root of the tree
root: TreeRootEntry,
/// Link to this tree
link: LinkEntry<K>,
/// Timestamp when the root was updated
root_updated: Instant,
/// The state of the tree sync progress.
sync_state: SyncState,
/// Links contained in this tree
resolved_links: HashMap<String, LinkEntry<K>>,
/// Unresolved links of the tree
unresolved_links: LinkedHashSet<String>,
/// Unresolved nodes of the tree
unresolved_nodes: LinkedHashSet<String>,
}
// === impl SyncTree ===
impl<K: EnrKeyUnambiguous> SyncTree<K> {
pub(crate) fn new(root: TreeRootEntry, link: LinkEntry<K>) -> Self {
Self {
root,
link,
root_updated: Instant::now(),
sync_state: SyncState::Pending,
resolved_links: Default::default(),
unresolved_links: Default::default(),
unresolved_nodes: Default::default(),
}
}
#[cfg(test)]
pub(crate) fn root(&self) -> &TreeRootEntry {
&self.root
}
pub(crate) fn link(&self) -> &LinkEntry<K> {
&self.link
}
pub(crate) fn resolved_links_mut(&mut self) -> &mut HashMap<String, LinkEntry<K>> {
&mut self.resolved_links
}
pub(crate) fn extend_children(
&mut self,
kind: ResolveKind,
children: impl IntoIterator<Item = String>,
) {
match kind {
ResolveKind::Enr => {
self.unresolved_nodes.extend(children);
}
ResolveKind::Link => {
self.unresolved_links.extend(children);
}
}
}
/// Advances the state of the tree by returning actions to perform
pub(crate) fn poll(&mut self, now: Instant, update_timeout: Duration) -> Option<SyncAction> {
match self.sync_state {
SyncState::Pending => {
self.sync_state = SyncState::Enr;
return Some(SyncAction::Link(self.root.link_root.clone()))
}
SyncState::Enr => {
self.sync_state = SyncState::Active;
return Some(SyncAction::Enr(self.root.enr_root.clone()))
}
SyncState::Link => {
self.sync_state = SyncState::Active;
return Some(SyncAction::Link(self.root.link_root.clone()))
}
SyncState::Active => {
if now > self.root_updated + update_timeout {
self.sync_state = SyncState::RootUpdate;
return Some(SyncAction::UpdateRoot)
}
}
SyncState::RootUpdate => return None,
}
if let Some(link) = self.unresolved_links.pop_front() {
return Some(SyncAction::Link(link))
}
let enr = self.unresolved_nodes.pop_front()?;
Some(SyncAction::Enr(enr))
}
/// Updates the root and returns what changed
pub(crate) fn update_root(&mut self, root: TreeRootEntry) {
let enr = root.enr_root == self.root.enr_root;
let link = root.link_root == self.root.link_root;
self.root = root;
self.root_updated = Instant::now();
let state = match (enr, link) {
(true, true) => {
self.unresolved_nodes.clear();
self.unresolved_links.clear();
SyncState::Pending
}
(true, _) => {
self.unresolved_nodes.clear();
SyncState::Enr
}
(_, true) => {
self.unresolved_links.clear();
SyncState::Link
}
_ => {
// unchanged
return
}
};
self.sync_state = state;
}
}
/// The action to perform by the service
pub(crate) enum SyncAction {
UpdateRoot,
Enr(String),
Link(String),
}
/// How the [SyncTree::update_root] changed the root
enum SyncState {
RootUpdate,
Pending,
Enr,
Link,
Active,
}
/// What kind of hash to resolve
pub(crate) enum ResolveKind {
Enr,
Link,
}
// === impl ResolveKind ===
impl ResolveKind {
pub(crate) fn is_link(&self) -> bool {
matches!(self, ResolveKind::Link)
}
}

View File

@ -16,10 +16,16 @@
//! `signature` is a 65-byte secp256k1 EC signature over the keccak256 hash of the record
//! content, excluding the sig= part, encoded as URL-safe base64 (RFC-4648).
use crate::tree::ParseDnsEntryError::{FieldNotFound, UnknownEntry};
#![allow(missing_docs)]
use crate::error::{
ParseDnsEntryError,
ParseDnsEntryError::{FieldNotFound, UnknownEntry},
ParseEntryResult,
};
use bytes::Bytes;
use data_encoding::{BASE32_NOPAD, BASE64URL_NOPAD};
use enr::{Enr, EnrKey, EnrKeyUnambiguous, EnrPublicKey};
use enr::{Enr, EnrError, EnrKey, EnrKeyUnambiguous, EnrPublicKey};
use reth_primitives::hex;
use secp256k1::SecretKey;
use std::{fmt, str::FromStr};
@ -50,24 +56,6 @@ impl<K: EnrKeyUnambiguous> fmt::Display for DnsEntry<K> {
}
}
/// Error while parsing a [DnsEntry]
#[derive(thiserror::Error, Debug)]
#[allow(missing_docs)]
pub enum ParseDnsEntryError {
#[error("Unknown entry: {0}")]
UnknownEntry(String),
#[error("Field {0} not found.")]
FieldNotFound(&'static str),
#[error("Base64 decoding failed: {0}")]
Base64DecodeError(String),
#[error("Base32 decoding failed: {0}")]
Base32DecodeError(String),
#[error("{0}")]
RlpDecodeError(String),
#[error("{0}")]
Other(String),
}
impl<K: EnrKeyUnambiguous> FromStr for DnsEntry<K> {
type Err = ParseDnsEntryError;
@ -87,12 +75,12 @@ impl<K: EnrKeyUnambiguous> FromStr for DnsEntry<K> {
}
/// Represents an `enr-root` hash of subtrees containing nodes and links.
#[derive(Clone)]
#[derive(Clone, Eq, PartialEq)]
pub struct TreeRootEntry {
enr_root: String,
link_root: String,
sequence_number: u64,
signature: Bytes,
pub enr_root: String,
pub link_root: String,
pub sequence_number: u64,
pub signature: Bytes,
}
// === impl TreeRootEntry ===
@ -101,7 +89,7 @@ impl TreeRootEntry {
/// Parses the entry from text.
///
/// Caution: This assumes the prefix is already removed.
fn parse_value(mut input: &str) -> Result<Self, ParseDnsEntryError> {
fn parse_value(mut input: &str) -> ParseEntryResult<Self> {
let input = &mut input;
let enr_root = parse_value(input, "e=", "ENR Root", |s| Ok(s.to_string()))?;
let link_root = parse_value(input, "l=", "Link Root", |s| Ok(s.to_string()))?;
@ -132,6 +120,13 @@ impl TreeRootEntry {
)
}
/// Signs the content with the given key
pub fn sign<K: EnrKey>(&mut self, key: &K) -> Result<(), EnrError> {
let sig = key.sign_v4(self.content().as_bytes()).map_err(|_| EnrError::SigningError)?;
self.signature = sig.into();
Ok(())
}
/// Verify the signature of the record.
#[must_use]
pub fn verify<K: EnrKey>(&self, pubkey: &K::PublicKey) -> bool {
@ -173,7 +168,7 @@ impl fmt::Display for TreeRootEntry {
/// A branch entry with base32 hashes
#[derive(Debug, Clone)]
pub struct BranchEntry {
children: Vec<String>,
pub children: Vec<String>,
}
// === impl BranchEntry ===
@ -182,7 +177,7 @@ impl BranchEntry {
/// Parses the entry from text.
///
/// Caution: This assumes the prefix is already removed.
fn parse_value(input: &str) -> Result<Self, ParseDnsEntryError> {
fn parse_value(input: &str) -> ParseEntryResult<Self> {
let children = input.trim().split(',').map(str::to_string).collect();
Ok(Self { children })
}
@ -209,8 +204,8 @@ impl fmt::Display for BranchEntry {
/// A link entry
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct LinkEntry<K: EnrKeyUnambiguous = SecretKey> {
domain: String,
pubkey: K::PublicKey,
pub domain: String,
pub pubkey: K::PublicKey,
}
// === impl LinkEntry ===
@ -219,7 +214,7 @@ impl<K: EnrKeyUnambiguous> LinkEntry<K> {
/// Parses the entry from text.
///
/// Caution: This assumes the prefix is already removed.
fn parse_value(input: &str) -> Result<Self, ParseDnsEntryError> {
fn parse_value(input: &str) -> ParseEntryResult<Self> {
let (pubkey, domain) = input.split_once('@').ok_or_else(|| {
ParseDnsEntryError::Other(format!("Missing @ delimiter in Link entry: {input}"))
})?;
@ -259,7 +254,7 @@ impl<K: EnrKeyUnambiguous> fmt::Display for LinkEntry<K> {
/// The actual [Enr] entry.
#[derive(Debug, Clone)]
pub struct NodeEntry<K: EnrKeyUnambiguous> {
enr: Enr<K>,
pub enr: Enr<K>,
}
// === impl NodeEntry ===
@ -268,7 +263,7 @@ impl<K: EnrKeyUnambiguous> NodeEntry<K> {
/// Parses the entry from text.
///
/// Caution: This assumes the prefix is already removed.
fn parse_value(s: &str) -> Result<Self, ParseDnsEntryError> {
fn parse_value(s: &str) -> ParseEntryResult<Self> {
let enr: Enr<K> = s.parse().map_err(ParseDnsEntryError::Other)?;
Ok(Self { enr })
}
@ -293,14 +288,9 @@ impl<K: EnrKeyUnambiguous> fmt::Display for NodeEntry<K> {
}
/// Parses the value of the key value pair
fn parse_value<F, V>(
input: &mut &str,
key: &str,
err: &'static str,
f: F,
) -> Result<V, ParseDnsEntryError>
fn parse_value<F, V>(input: &mut &str, key: &str, err: &'static str, f: F) -> ParseEntryResult<V>
where
F: Fn(&str) -> Result<V, ParseDnsEntryError>,
F: Fn(&str) -> ParseEntryResult<V>,
{
ensure_strip_key(input, key, err)?;
let val = input.split_whitespace().next().ok_or(FieldNotFound(err))?;
@ -312,11 +302,7 @@ where
/// Strips the `key` from the `input`
///
/// Returns an err if the `input` does not start with the `key`
fn ensure_strip_key(
input: &mut &str,
key: &str,
err: &'static str,
) -> Result<(), ParseDnsEntryError> {
fn ensure_strip_key(input: &mut &str, key: &str, err: &'static str) -> ParseEntryResult<()> {
*input = input.trim_start().strip_prefix(key).ok_or(FieldNotFound(err))?;
Ok(())
}

View File

@ -14,6 +14,21 @@ pub enum Chain {
}
impl Chain {
/// Returns the mainnet chain.
pub const fn mainnet() -> Self {
Chain::Named(ethers_core::types::Chain::Mainnet)
}
/// Returns the goerli chain.
pub const fn goerli() -> Self {
Chain::Named(ethers_core::types::Chain::Goerli)
}
/// Returns the sepolia chain.
pub const fn sepolia() -> Self {
Chain::Named(ethers_core::types::Chain::Sepolia)
}
/// The id of the chain
pub fn id(&self) -> u64 {
match self {
@ -30,6 +45,21 @@ impl Chain {
Chain::Id(_) => false,
}
}
/// Returns the address of the public DNS node list for the given chain.
///
/// See also <https://github.com/ethereum/discv4-dns-lists>
pub fn public_dns_network_protocol(self) -> Option<String> {
use ethers_core::types::Chain::*;
const DNS_PREFIX: &str = "enrtree://AKA3AM6LPBYEUDMVNU3BSVQJ5AD45Y7YPOHJLEF6W26QOE4VTUDPE@";
let named: ethers_core::types::Chain = self.try_into().ok()?;
if matches!(named, Mainnet | Goerli | Sepolia | Ropsten | Rinkeby) {
return Some(format!("{DNS_PREFIX}all.{}.ethdisco.net", named.as_ref().to_lowercase()))
}
None
}
}
impl fmt::Display for Chain {
@ -251,4 +281,11 @@ mod tests {
assert_eq!(chain.length(), 3);
}
#[test]
fn test_dns_network() {
let s = "enrtree://AKA3AM6LPBYEUDMVNU3BSVQJ5AD45Y7YPOHJLEF6W26QOE4VTUDPE@all.mainnet.ethdisco.net";
let chain: Chain = ethers_core::types::Chain::Mainnet.into();
assert_eq!(s, chain.public_dns_network_protocol().unwrap().as_str());
}
}