feat: integrate DNS discovery service (#832)

This commit is contained in:
Matthias Seitz
2023-01-13 10:34:13 +01:00
committed by GitHub
parent ba3048ceb0
commit 7767b216bc
10 changed files with 200 additions and 43 deletions

2
Cargo.lock generated
View File

@ -3749,6 +3749,7 @@ dependencies = [
"parking_lot 0.12.1",
"reth-net-common",
"reth-primitives",
"reth-rlp",
"reth-tracing",
"secp256k1 0.24.2",
"thiserror",
@ -3989,6 +3990,7 @@ dependencies = [
"pin-project",
"rand 0.8.5",
"reth-discv4",
"reth-dns-discovery",
"reth-ecies",
"reth-eth-wire",
"reth-interfaces",

View File

@ -11,6 +11,7 @@ description = "Support for EIP-1459 Node Discovery via DNS"
# reth
reth-primitives = { path = "../../primitives" }
reth-net-common = { path = "../common" }
reth-rlp = { path = "../../common/rlp" }
# ethereum
secp256k1 = { version = "0.24", features = [

View File

@ -1,4 +1,5 @@
use std::{num::NonZeroUsize, time::Duration};
use crate::tree::LinkEntry;
use std::{collections::HashSet, num::NonZeroUsize, time::Duration};
/// Settings for the [DnsDiscoveryClient](crate::DnsDiscoveryClient).
#[derive(Debug, Clone)]
@ -17,6 +18,8 @@ pub struct DnsDiscoveryConfig {
pub recheck_interval: Duration,
/// Maximum number of cached DNS records.
pub dns_record_cache_limit: NonZeroUsize,
/// Links to the DNS networks to bootstrap.
pub bootstrap_dns_networks: Option<HashSet<LinkEntry>>,
}
impl Default for DnsDiscoveryConfig {
@ -26,6 +29,7 @@ impl Default for DnsDiscoveryConfig {
max_requests_per_sec: NonZeroUsize::new(3).unwrap(),
recheck_interval: Duration::from_secs(60 * 30),
dns_record_cache_limit: NonZeroUsize::new(1_000).unwrap(),
bootstrap_dns_networks: Some(Default::default()),
}
}
}

View File

@ -17,10 +17,10 @@ pub use config::DnsDiscoveryConfig;
use enr::Enr;
use error::ParseDnsEntryError;
use lru::LruCache;
use reth_primitives::{NodeRecord, PeerId};
use reth_primitives::{ForkId, NodeRecord, PeerId};
use secp256k1::SecretKey;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
net::IpAddr,
pin::Pin,
sync::Arc,
@ -28,16 +28,19 @@ use std::{
time::{Duration, Instant},
};
use sync::SyncTree;
use tokio::sync::{
mpsc,
mpsc::{error::TrySendError, UnboundedSender},
oneshot,
use tokio::{
sync::{
mpsc,
mpsc::{error::TrySendError, UnboundedSender},
oneshot,
},
task::JoinHandle,
};
use tokio_stream::{
wrappers::{ReceiverStream, UnboundedReceiverStream},
Stream,
Stream, StreamExt,
};
use tracing::{debug, warn};
use tracing::{debug, trace, warn};
mod config;
mod error;
@ -70,7 +73,7 @@ impl DnsDiscoveryHandle {
/// Returns the receiver half of new listener channel that streams discovered [`NodeRecord`]s.
pub async fn node_record_stream(
&self,
) -> Result<ReceiverStream<NodeRecord>, oneshot::error::RecvError> {
) -> Result<ReceiverStream<DnsNodeRecordUpdate>, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
let cmd = DnsDiscoveryCommand::NodeRecordUpdates(tx);
let _ = self.to_service.send(cmd);
@ -86,7 +89,7 @@ pub struct DnsDiscoveryService<R: Resolver = DnsResolver> {
/// Receiver half of the command channel.
command_rx: UnboundedReceiverStream<DnsDiscoveryCommand>,
/// All subscribers for resolved [NodeRecord]s.
node_record_listeners: Vec<mpsc::Sender<NodeRecord>>,
node_record_listeners: Vec<mpsc::Sender<DnsNodeRecordUpdate>>,
/// All the trees that can be synced.
trees: HashMap<LinkEntry, SyncTree>,
/// All queries currently in progress
@ -97,6 +100,8 @@ pub struct DnsDiscoveryService<R: Resolver = DnsResolver> {
queued_events: VecDeque<DnsDiscoveryEvent>,
/// The rate at which trees should be updated.
recheck_interval: Duration,
/// Links to the DNS networks to bootstrap.
bootstrap_dns_networks: HashSet<LinkEntry>,
}
// === impl DnsDiscoveryService ===
@ -118,6 +123,7 @@ impl<R: Resolver> DnsDiscoveryService<R> {
max_requests_per_sec,
recheck_interval,
dns_record_cache_limit,
bootstrap_dns_networks,
} = config;
let queries = QueryPool::new(resolver, max_requests_per_sec, lookup_timeout);
let (command_tx, command_rx) = mpsc::unbounded_channel();
@ -130,6 +136,27 @@ impl<R: Resolver> DnsDiscoveryService<R> {
dns_record_cache: LruCache::new(dns_record_cache_limit),
queued_events: Default::default(),
recheck_interval,
bootstrap_dns_networks: bootstrap_dns_networks.unwrap_or_default(),
}
}
/// Spawns this services onto a new task
///
/// Note: requires a running runtime
pub fn spawn(mut self) -> JoinHandle<()> {
tokio::task::spawn(async move {
self.bootstrap();
while let Some(event) = self.next().await {
trace!(target : "disc::dns", ?event, "processed");
}
})
}
/// Starts discovery with all configured bootstrap links
pub fn bootstrap(&mut self) {
for link in self.bootstrap_dns_networks.clone() {
self.sync_tree_with_link(link);
}
}
@ -147,7 +174,7 @@ impl<R: Resolver> DnsDiscoveryService<R> {
}
/// Creates a new channel for [`NodeRecord`]s.
pub fn node_record_stream(&mut self) -> ReceiverStream<NodeRecord> {
pub fn node_record_stream(&mut self) -> ReceiverStream<DnsNodeRecordUpdate> {
let (tx, rx) = mpsc::channel(256);
self.node_record_listeners.push(tx);
ReceiverStream::new(rx)
@ -156,8 +183,8 @@ impl<R: Resolver> DnsDiscoveryService<R> {
/// Sends the event to all listeners.
///
/// Remove channels that got closed.
fn notify(&mut self, record: NodeRecord) {
self.node_record_listeners.retain_mut(|listener| match listener.try_send(record) {
fn notify(&mut self, record: DnsNodeRecordUpdate) {
self.node_record_listeners.retain_mut(|listener| match listener.try_send(record.clone()) {
Ok(()) => true,
Err(err) => match err {
TrySendError::Full(_) => true,
@ -329,11 +356,20 @@ impl<R: Resolver> Stream for DnsDiscoveryService<R> {
}
}
/// The converted discovered [Enr] object
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DnsNodeRecordUpdate {
/// Discovered node and it's addresses
pub node_record: NodeRecord,
/// The forkid of the node, if present in the ENR
pub fork_id: Option<ForkId>,
}
/// Commands sent from [DnsDiscoveryHandle] to [DnsDiscoveryService]
enum DnsDiscoveryCommand {
/// Sync a tree
SyncTree(LinkEntry),
NodeRecordUpdates(oneshot::Sender<ReceiverStream<NodeRecord>>),
NodeRecordUpdates(oneshot::Sender<ReceiverStream<DnsNodeRecordUpdate>>),
}
/// Represents dns discovery related update events.
@ -344,15 +380,21 @@ pub enum DnsDiscoveryEvent {
}
/// Converts an [Enr] into a [NodeRecord]
fn convert_enr_node_record(enr: &Enr<SecretKey>) -> Option<NodeRecord> {
let record = NodeRecord {
fn convert_enr_node_record(enr: &Enr<SecretKey>) -> Option<DnsNodeRecordUpdate> {
use reth_rlp::Decodable;
let node_record = NodeRecord {
address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?,
tcp_port: enr.tcp4().or_else(|| enr.tcp6())?,
udp_port: enr.udp4().or_else(|| enr.udp6())?,
id: PeerId::from_slice(&enr.public_key().serialize_uncompressed()[1..]),
}
.into_ipv4_mapped();
Some(record)
let mut maybe_fork_id = enr.get(b"eth")?;
let fork_id = ForkId::decode(&mut maybe_fork_id).ok();
Some(DnsNodeRecordUpdate { node_record, fork_id })
}
#[cfg(test)]
@ -360,7 +402,8 @@ mod tests {
use super::*;
use crate::tree::TreeRootEntry;
use enr::{EnrBuilder, EnrKey};
use reth_primitives::Chain;
use reth_primitives::{Chain, Hardfork};
use reth_rlp::Encodable;
use secp256k1::rand::thread_rng;
use std::{future::poll_fn, net::Ipv4Addr};
use tokio_stream::StreamExt;
@ -408,7 +451,10 @@ mod tests {
resolver.insert(link.domain.clone(), root.to_string());
let mut builder = EnrBuilder::new("v4");
builder.ip4(Ipv4Addr::LOCALHOST).udp4(30303).tcp4(30303);
let mut buf = Vec::new();
let fork_id = Hardfork::Frontier.fork_id();
fork_id.encode(&mut buf);
builder.ip4(Ipv4Addr::LOCALHOST).udp4(30303).tcp4(30303).add_value(b"eth", &buf);
let enr = builder.build(&secret_key).unwrap();
resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64());
@ -418,7 +464,8 @@ mod tests {
let mut node_records = service.node_record_stream();
let task = tokio::task::spawn(async move {
let _ = node_records.next().await.unwrap();
let record = node_records.next().await.unwrap();
assert_eq!(record.fork_id, Some(fork_id));
});
service.sync_tree_with_link(link.clone());

View File

@ -4,10 +4,8 @@ use async_trait::async_trait;
use parking_lot::RwLock;
use std::collections::HashMap;
use tracing::trace;
pub use trust_dns_resolver::TokioAsyncResolver;
use trust_dns_resolver::{
error::ResolveError, proto::DnsHandle, AsyncResolver, ConnectionProvider,
};
pub use trust_dns_resolver::{error::ResolveError, TokioAsyncResolver};
use trust_dns_resolver::{proto::DnsHandle, AsyncResolver, ConnectionProvider};
/// A type that can lookup DNS entries
#[async_trait]

View File

@ -22,6 +22,7 @@ reth-primitives = { path = "../../primitives" }
reth-net-common = { path = "../common" }
reth-network-api = { path = "../network-api" }
reth-discv4 = { path = "../discv4" }
reth-dns-discovery = { path = "../dns" }
reth-eth-wire = { path = "../eth-wire" }
reth-ecies = { path = "../ecies" }
reth-rlp = { path = "../../common/rlp" }

View File

@ -25,6 +25,7 @@ mod __reexport {
pub use secp256k1::SecretKey;
}
pub use __reexport::*;
use reth_dns_discovery::DnsDiscoveryConfig;
use reth_ecies::util::pk2id;
use reth_eth_wire::{HelloMessage, Status};
@ -41,6 +42,8 @@ pub struct NetworkConfig<C> {
pub secret_key: SecretKey,
/// All boot nodes to start network discovery with.
pub boot_nodes: HashSet<NodeRecord>,
/// How to set up discovery over DNS.
pub dns_discovery_config: Option<DnsDiscoveryConfig>,
/// How to set up discovery.
pub discovery_v4_config: Option<Discv4Config>,
/// Address to use for discovery
@ -124,6 +127,8 @@ pub struct NetworkConfigBuilder<C> {
client: Arc<C>,
/// The node's secret key, from which the node's identity is derived.
secret_key: SecretKey,
/// How to configure discovery over DNS.
dns_discovery_config: Option<DnsDiscoveryConfig>,
/// How to set up discovery.
discovery_v4_builder: Option<Discv4ConfigBuilder>,
/// All boot nodes to start network discovery with.
@ -164,6 +169,7 @@ impl<C> NetworkConfigBuilder<C> {
Self {
client,
secret_key,
dns_discovery_config: Some(Default::default()),
discovery_v4_builder: Some(Default::default()),
boot_nodes: Default::default(),
discovery_addr: None,
@ -274,6 +280,12 @@ impl<C> NetworkConfigBuilder<C> {
self
}
/// Sets the dns discovery config to use.
pub fn dns_discovery(mut self, config: DnsDiscoveryConfig) -> Self {
self.dns_discovery_config = Some(config);
self
}
/// Sets the boot nodes.
pub fn boot_nodes(mut self, nodes: impl IntoIterator<Item = NodeRecord>) -> Self {
self.boot_nodes = nodes.into_iter().collect();
@ -299,6 +311,7 @@ impl<C> NetworkConfigBuilder<C> {
let Self {
client,
secret_key,
mut dns_discovery_config,
discovery_v4_builder,
boot_nodes,
discovery_addr,
@ -331,10 +344,22 @@ impl<C> NetworkConfigBuilder<C> {
ForkFilter::new(head, genesis_hash, Hardfork::all_forks())
});
// If default DNS config is used then we add the known dns network to bootstrap from
if let Some(dns_networks) =
dns_discovery_config.as_mut().and_then(|c| c.bootstrap_dns_networks.as_mut())
{
if dns_networks.is_empty() {
if let Some(link) = chain.public_dns_network_protocol() {
dns_networks.insert(link.parse().expect("is valid DNS link entry"));
}
}
}
NetworkConfig {
client,
secret_key,
boot_nodes,
dns_discovery_config,
discovery_v4_config: discovery_v4_builder.map(|builder| builder.build()),
discovery_addr: discovery_addr.unwrap_or_else(|| {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT))
@ -376,3 +401,28 @@ impl NetworkMode {
matches!(self, NetworkMode::Stake)
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::thread_rng;
use reth_dns_discovery::tree::LinkEntry;
use reth_provider::test_utils::NoopProvider;
fn builder() -> NetworkConfigBuilder<NoopProvider> {
let secret_key = SecretKey::new(&mut thread_rng());
NetworkConfig::builder(Arc::new(NoopProvider::default()), secret_key)
}
#[test]
fn test_network_dns_defaults() {
let config = builder().build();
let dns = config.dns_discovery_config.unwrap();
let bootstrap_nodes = dns.bootstrap_dns_networks.unwrap();
let mainnet_dns: LinkEntry =
Chain::mainnet().public_dns_network_protocol().unwrap().parse().unwrap();
assert!(bootstrap_nodes.contains(&mainnet_dns));
assert_eq!(bootstrap_nodes.len(), 1);
}
}

View File

@ -3,11 +3,15 @@
use crate::error::NetworkError;
use futures::StreamExt;
use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config};
use reth_dns_discovery::{
DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver,
};
use reth_primitives::{ForkId, NodeRecord, PeerId};
use secp256k1::SecretKey;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
net::{IpAddr, SocketAddr},
sync::Arc,
task::{Context, Poll},
};
use tokio::task::JoinHandle;
@ -27,10 +31,16 @@ pub struct Discovery {
discv4: Option<Discv4>,
/// All KAD table updates from the discv4 service.
discv4_updates: Option<ReceiverStream<DiscoveryUpdate>>,
/// Events buffered until polled.
queued_events: VecDeque<DiscoveryEvent>,
/// The handle to the spawned discv4 service
_discv4_service: Option<JoinHandle<()>>,
/// Handler to interact with the DNS discovery service
_dns_discovery: Option<DnsDiscoveryHandle>,
/// Updates from the DNS discovery service.
dns_discovery_updates: Option<ReceiverStream<DnsNodeRecordUpdate>>,
/// The handle to the spawned DNS discovery service
_dns_disc_service: Option<JoinHandle<()>>,
/// Events buffered until polled.
queued_events: VecDeque<DiscoveryEvent>,
}
impl Discovery {
@ -42,7 +52,9 @@ impl Discovery {
discovery_addr: SocketAddr,
sk: SecretKey,
discv4_config: Option<Discv4Config>,
dns_discovery_config: Option<DnsDiscoveryConfig>,
) -> Result<Self, NetworkError> {
// setup discv4
let local_enr = NodeRecord::from_secret_key(discovery_addr, &sk);
let (discv4, discv4_updates, _discv4_service) = if let Some(disc_config) = discv4_config {
let (discv4, mut discv4_service) =
@ -57,6 +69,20 @@ impl Discovery {
(None, None, None)
};
// setup DNS discovery
let (_dns_discovery, dns_discovery_updates, _dns_disc_service) =
if let Some(dns_config) = dns_discovery_config {
let (mut service, dns_disc) = DnsDiscoveryService::new_pair(
Arc::new(DnsResolver::from_system_conf()?),
dns_config,
);
let dns_discovery_updates = service.node_record_stream();
let dns_disc_service = service.spawn();
(Some(dns_disc), Some(dns_discovery_updates), Some(dns_disc_service))
} else {
(None, None, None)
};
Ok(Self {
local_enr,
discv4,
@ -64,6 +90,9 @@ impl Discovery {
_discv4_service,
discovered_nodes: Default::default(),
queued_events: Default::default(),
_dns_disc_service,
_dns_discovery,
dns_discovery_updates,
})
}
@ -94,18 +123,23 @@ impl Discovery {
self.local_enr.id
}
/// Processes an incoming [NodeRecord] update from a discovery service
fn on_node_record_update(&mut self, record: NodeRecord, _fork_id: Option<ForkId>) {
let id = record.id;
let addr = record.tcp_addr();
match self.discovered_nodes.entry(id) {
Entry::Occupied(_entry) => {}
Entry::Vacant(entry) => {
entry.insert(addr);
self.queued_events.push_back(DiscoveryEvent::Discovered(id, addr))
}
}
}
fn on_discv4_update(&mut self, update: DiscoveryUpdate) {
match update {
DiscoveryUpdate::Added(node) => {
let id = node.id;
let addr = node.tcp_addr();
match self.discovered_nodes.entry(id) {
Entry::Occupied(_entry) => {}
Entry::Vacant(entry) => {
entry.insert(addr);
self.queued_events.push_back(DiscoveryEvent::Discovered(id, addr))
}
}
DiscoveryUpdate::Added(record) => {
self.on_node_record_update(record, None);
}
DiscoveryUpdate::EnrForkId(node, fork_id) => {
self.queued_events.push_back(DiscoveryEvent::EnrForkId(node.id, fork_id))
@ -128,13 +162,19 @@ impl Discovery {
return Poll::Ready(event)
}
// drain the update stream
// drain the update streams
while let Some(Poll::Ready(Some(update))) =
self.discv4_updates.as_mut().map(|disc_updates| disc_updates.poll_next_unpin(cx))
self.discv4_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
{
self.on_discv4_update(update)
}
while let Some(Poll::Ready(Some(update))) =
self.dns_discovery_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
{
self.on_node_record_update(update.node_record, update.fork_id);
}
if self.queued_events.is_empty() {
return Poll::Pending
}
@ -160,6 +200,9 @@ impl Discovery {
discv4_updates: Default::default(),
queued_events: Default::default(),
_discv4_service: Default::default(),
_dns_discovery: None,
dns_discovery_updates: None,
_dns_disc_service: None,
}
}
}
@ -185,6 +228,8 @@ mod tests {
let (secret_key, _) = SECP256K1.generate_keypair(&mut rng);
let discovery_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
let _discovery =
Discovery::new(discovery_addr, secret_key, Default::default()).await.unwrap();
Discovery::new(discovery_addr, secret_key, Default::default(), Default::default())
.await
.unwrap();
}
}

View File

@ -1,6 +1,7 @@
//! Possible errors when interacting with the network.
use crate::session::PendingSessionHandshakeError;
use reth_dns_discovery::resolver::ResolveError;
use reth_eth_wire::{
errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
DisconnectReason,
@ -12,10 +13,15 @@ use std::{fmt, io, io::ErrorKind};
pub enum NetworkError {
/// General IO error.
#[error(transparent)]
Io(#[from] std::io::Error),
Io(#[from] io::Error),
/// IO error when creating the discovery service
#[error("Failed to launch discovery service: {0}")]
Discovery(std::io::Error),
Discovery(io::Error),
/// Error when setting up the DNS resolver failed
///
/// See also [DnsResolver](reth_dns_discovery::DnsResolver::from_system_conf)
#[error("Failed to configure DNS resolver: {0}")]
DnsResolver(#[from] ResolveError),
}
/// Abstraction over errors that can lead to a failed session

View File

@ -161,6 +161,7 @@ where
hello_message,
status,
fork_filter,
dns_discovery_config,
..
} = config;
@ -177,7 +178,9 @@ where
disc_config
});
let discovery = Discovery::new(discovery_addr, secret_key, discovery_v4_config).await?;
let discovery =
Discovery::new(discovery_addr, secret_key, discovery_v4_config, dns_discovery_config)
.await?;
// need to retrieve the addr here since provided port could be `0`
let local_peer_id = discovery.local_id();