feat(discv4): support eth entries (#447)

* add ENRRequest and ENRResponse messages

 * todo: encode and decode impls

* scaffold enrrequest and enrresponse msgs

* implement encodable and decodable for enr

* cargo fmt

* impl sending enr requests

* silence clippy for now

* add todos for enrrequest and response in mocks

* make payload length fold more explicit

* feat: add enr support to ping pong

* integrate enr

* add update methods

* add enr handling

* feat: add enr handling

* integrate fork id

* fix: set frontier forkid

* disable eip868 by default

* address comments

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Dan Cline
2022-12-16 07:14:19 -05:00
committed by GitHub
parent 864e6481da
commit 635203759b
15 changed files with 926 additions and 108 deletions

6
Cargo.lock generated
View File

@ -1160,6 +1160,7 @@ dependencies = [
"log",
"rand 0.8.5",
"rlp",
"secp256k1",
"serde",
"sha3",
"zeroize",
@ -3264,6 +3265,7 @@ version = "0.1.0"
dependencies = [
"bytes",
"discv5",
"enr 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"generic-array",
"hex",
"public-ip",
@ -3565,11 +3567,15 @@ dependencies = [
"auto_impl",
"bytes",
"criterion",
"enr 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethereum-types",
"ethnum",
"hex-literal",
"rand 0.8.5",
"reth-rlp",
"reth-rlp-derive",
"rlp",
"secp256k1",
"smol_str",
]

View File

@ -12,6 +12,8 @@ auto_impl = "1"
bytes = { version = "1", default-features = false }
ethnum = { version = "1", default-features = false, optional = true }
smol_str = { version = "0.1", default-features = false, optional = true }
enr = { version = "0.7", default-features = false, optional = true }
rlp = { version = "0.5.2", default-features = false, optional = true }
ethereum-types = { version = "0.14", features = ["codec"], optional = true }
reth-rlp-derive = { version = "0.1", path = "../rlp-derive", optional = true }
@ -21,15 +23,21 @@ reth-rlp = { path = ".", package = "reth-rlp", features = [
"std",
"ethnum",
"ethereum-types",
"enr",
"smol_str"
] }
criterion = "0.4.0"
hex-literal = "0.3"
rand = "0.8"
secp256k1 = { version = "0.24", features = [
"rand-std",
] }
[features]
alloc = []
derive = ["reth-rlp-derive"]
std = ["alloc"]
enr = ["dep:enr", "dep:rlp", "enr/rust-secp256k1", "derive"]
[[bench]]
name = "bench"

View File

@ -384,6 +384,33 @@ impl Decodable for smol_str::SmolStr {
}
}
#[cfg(feature = "enr")]
impl<K> Decodable for enr::Enr<K>
where
K: enr::EnrKey,
{
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError> {
// currently the only way to build an enr is to decode it using the rlp::Decodable trait
<Self as rlp::Decodable>::decode(&rlp::Rlp::new(buf)).map_err(|e| match e {
rlp::DecoderError::RlpIsTooShort => DecodeError::InputTooShort,
rlp::DecoderError::RlpInvalidLength => DecodeError::Overflow,
rlp::DecoderError::RlpExpectedToBeList => DecodeError::UnexpectedString,
rlp::DecoderError::RlpExpectedToBeData => DecodeError::UnexpectedList,
rlp::DecoderError::RlpDataLenWithZeroPrefix |
rlp::DecoderError::RlpListLenWithZeroPrefix => DecodeError::LeadingZero,
rlp::DecoderError::RlpInvalidIndirection => DecodeError::NonCanonicalSize,
rlp::DecoderError::RlpIncorrectListLen => {
DecodeError::Custom("incorrect list length when decoding rlp")
}
rlp::DecoderError::RlpIsTooBig => DecodeError::Custom("rlp is too big"),
rlp::DecoderError::RlpInconsistentLengthAndData => {
DecodeError::Custom("inconsistent length and data when decoding rlp")
}
rlp::DecoderError::Custom(s) => DecodeError::Custom(s),
})
}
}
#[cfg(test)]
mod tests {
extern crate alloc;
@ -602,4 +629,62 @@ mod tests {
(Err(DecodeError::UnexpectedList), &hex!("C0")[..]),
])
}
// test vector from the enr library rlp encoding tests
// <https://github.com/sigp/enr/blob/e59dcb45ea07e423a7091d2a6ede4ad6d8ef2840/src/lib.rs#L1019>
#[cfg(feature = "enr")]
#[test]
fn decode_enr_rlp() {
use enr::{secp256k1::SecretKey, Enr, EnrPublicKey};
use std::net::Ipv4Addr;
let valid_record = hex!("f884b8407098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b76f2635f4e234738f30813a89eb9137e3e3df5266e3a1f11df72ecf1145ccb9c01826964827634826970847f00000189736563703235366b31a103ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd31388375647082765f");
let signature = hex!("7098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b76f2635f4e234738f30813a89eb9137e3e3df5266e3a1f11df72ecf1145ccb9c");
let expected_pubkey =
hex!("03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd3138");
let enr = Enr::<SecretKey>::decode(&mut &valid_record[..]).unwrap();
let pubkey = enr.public_key().encode();
assert_eq!(enr.ip4(), Some(Ipv4Addr::new(127, 0, 0, 1)));
assert_eq!(enr.id(), Some(String::from("v4")));
assert_eq!(enr.udp4(), Some(30303));
assert_eq!(enr.tcp4(), None);
assert_eq!(enr.signature(), &signature[..]);
assert_eq!(pubkey.to_vec(), expected_pubkey);
assert!(enr.verify());
}
// test vector from the enr library rlp encoding tests
// <https://github.com/sigp/enr/blob/e59dcb45ea07e423a7091d2a6ede4ad6d8ef2840/src/lib.rs#LL1206C35-L1206C35>
#[cfg(feature = "enr")]
#[test]
fn encode_decode_enr_rlp() {
use enr::{secp256k1::SecretKey, Enr, EnrBuilder, EnrKey, EnrPublicKey};
use std::net::Ipv4Addr;
let key = SecretKey::new(&mut rand::rngs::OsRng);
let ip = Ipv4Addr::new(127, 0, 0, 1);
let tcp = 3000;
let enr = {
let mut builder = EnrBuilder::new("v4");
builder.ip(ip.into());
builder.tcp4(tcp);
builder.build(&key).unwrap()
};
let mut encoded = BytesMut::new();
enr.encode(&mut encoded);
let decoded_enr = Enr::<SecretKey>::decode(&mut &encoded[..]).unwrap();
assert_eq!(decoded_enr, enr);
assert_eq!(decoded_enr.id(), Some("v4".into()));
assert_eq!(decoded_enr.ip4(), Some(ip));
assert_eq!(decoded_enr.tcp4(), Some(tcp));
assert_eq!(decoded_enr.public_key().encode(), key.public().encode());
assert!(decoded_enr.verify());
}
}

View File

@ -185,6 +185,38 @@ impl Encodable for smol_str::SmolStr {
}
}
#[cfg(feature = "enr")]
impl<K> Encodable for enr::Enr<K>
where
K: enr::EnrKey,
{
fn encode(&self, out: &mut dyn BufMut) {
let payload_length = self.signature().length() +
self.seq().length() +
self.iter().fold(0, |acc, (k, v)| acc + k.as_slice().length() + v.len());
let header = Header { list: true, payload_length };
header.encode(out);
self.signature().encode(out);
self.seq().encode(out);
for (k, v) in self.iter() {
// Keys are byte data
k.as_slice().encode(out);
// Values are raw RLP encoded data
out.put_slice(v);
}
}
fn length(&self) -> usize {
let payload_length = self.signature().length() +
self.seq().length() +
self.iter().fold(0, |acc, (k, v)| acc + k.as_slice().length() + v.len());
payload_length + length_of_length(payload_length)
}
}
#[cfg(feature = "ethnum")]
mod ethnum_support {
use super::*;
@ -577,4 +609,37 @@ mod tests {
"abcdefgh".to_string().encode(&mut b);
assert_eq!(&encoded(SmolStr::new("abcdefgh"))[..], b.as_ref());
}
// test vector from the enr library rlp encoding tests
// <https://github.com/sigp/enr/blob/e59dcb45ea07e423a7091d2a6ede4ad6d8ef2840/src/lib.rs#L1019>
#[cfg(feature = "enr")]
#[test]
fn encode_known_rlp_enr() {
use crate::Decodable;
use enr::{secp256k1::SecretKey, Enr, EnrPublicKey};
use std::net::Ipv4Addr;
let valid_record = hex!("f884b8407098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b76f2635f4e234738f30813a89eb9137e3e3df5266e3a1f11df72ecf1145ccb9c01826964827634826970847f00000189736563703235366b31a103ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd31388375647082765f");
let signature = hex!("7098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b76f2635f4e234738f30813a89eb9137e3e3df5266e3a1f11df72ecf1145ccb9c");
let expected_pubkey =
hex!("03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd3138");
let enr = Enr::<SecretKey>::decode(&mut &valid_record[..]).unwrap();
let pubkey = enr.public_key().encode();
assert_eq!(enr.ip4(), Some(Ipv4Addr::new(127, 0, 0, 1)));
assert_eq!(enr.id(), Some(String::from("v4")));
assert_eq!(enr.udp4(), Some(30303));
assert_eq!(enr.tcp4(), None);
assert_eq!(enr.signature(), &signature[..]);
assert_eq!(pubkey.to_vec(), expected_pubkey);
assert!(enr.verify());
let mut encoded = BytesMut::new();
enr.encode(&mut encoded);
assert_eq!(&encoded[..], &valid_record[..]);
// ensure the length is equal
assert_eq!(enr.length(), valid_record.len());
}
}

View File

@ -12,7 +12,7 @@ Ethereum network support
[dependencies]
# reth
reth-primitives = { path = "../../primitives" }
reth-rlp = { path = "../../common/rlp" }
reth-rlp = { path = "../../common/rlp", features = ["enr"] }
reth-rlp-derive = { path = "../../common/rlp-derive" }
reth-net-common = { path = "../common" }
@ -23,15 +23,16 @@ secp256k1 = { version = "0.24", features = [
"rand-std",
"recovery",
] }
enr = { version = "0.7.0", default-features = false, features = ["rust-secp256k1"] }
# async/futures
tokio = { version = "1", features = ["io-util", "net", "time"] }
tokio-stream = "0.1"
# misc
bytes = "1.2"
generic-array = "0.14"
tracing = "0.1"
bytes = "1.2"
thiserror = "1.0"
url = "2.3"
hex = "0.4"

View File

@ -1,10 +1,16 @@
//! A set of configuration parameters to tune the discovery protocol.
//!
//! This basis of this file has been taken from the discv5 codebase:
//! https://github.com/sigp/discv5
use crate::node::NodeRecord;
use bytes::{Bytes, BytesMut};
use reth_net_common::ban_list::BanList;
///! A set of configuration parameters to tune the discovery protocol.
// This basis of this file has been taken from the discv5 codebase:
// https://github.com/sigp/discv5
use std::collections::HashSet;
use std::time::Duration;
use reth_rlp::Encodable;
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
/// Configuration parameters that define the performance of the discovery network.
#[derive(Clone, Debug)]
@ -21,7 +27,9 @@ pub struct Discv4Config {
/// The rate at which lookups should be triggered.
pub lookup_interval: Duration,
/// The duration of we consider a FindNode request timed out.
pub find_node_timeout: Duration,
pub request_timeout: Duration,
/// The duration after which we consider an enr request timed out.
pub enr_timeout: Duration,
/// The duration we set for neighbours responses
pub neighbours_timeout: Duration,
/// Provides a way to ban peers and ips.
@ -38,6 +46,10 @@ pub struct Discv4Config {
pub enable_dht_random_walk: bool,
/// Whether to automatically lookup peers.
pub enable_lookup: bool,
/// Whether to enable EIP-868 extension
pub enable_eip868: bool,
/// Additional pairs to include in The [`Enr`](enr::Enr) if EIP-868 extension is enabled <https://eips.ethereum.org/EIPS/eip-868>
pub additional_eip868_rlp_pairs: HashMap<Vec<u8>, Bytes>,
}
impl Discv4Config {
@ -45,6 +57,30 @@ impl Discv4Config {
pub fn builder() -> Discv4ConfigBuilder {
Default::default()
}
/// Add another key value pair to include in the ENR
pub fn add_eip868_pair(&mut self, key: impl AsRef<[u8]>, value: impl Encodable) -> &mut Self {
let mut buf = BytesMut::new();
value.encode(&mut buf);
self.add_eip868_rlp_pair(key, buf.freeze())
}
/// Add another key value pair to include in the ENR
pub fn add_eip868_rlp_pair(&mut self, key: impl AsRef<[u8]>, rlp: Bytes) -> &mut Self {
self.additional_eip868_rlp_pairs.insert(key.as_ref().to_vec(), rlp);
self
}
/// Extend additional key value pairs to include in the ENR
pub fn extend_eip868_rlp_pairs(
&mut self,
pairs: impl IntoIterator<Item = (impl AsRef<[u8]>, Bytes)>,
) -> &mut Self {
for (k, v) in pairs.into_iter() {
self.add_eip868_rlp_pair(k, v);
}
self
}
}
impl Default for Discv4Config {
@ -54,14 +90,17 @@ impl Default for Discv4Config {
request_retries: 1,
ping_interval: Duration::from_secs(300),
ping_timeout: Duration::from_secs(5),
find_node_timeout: Duration::from_secs(2),
neighbours_timeout: Duration::from_secs(30),
request_timeout: Duration::from_secs(20),
enr_timeout: Duration::from_secs(5),
neighbours_timeout: Duration::from_secs(5),
lookup_interval: Duration::from_secs(20),
ban_list: Default::default(),
ban_duration: Some(Duration::from_secs(3600)), // 1 hour
bootstrap_nodes: Default::default(),
enable_dht_random_walk: true,
enable_lookup: true,
enable_eip868: true,
additional_eip868_rlp_pairs: Default::default(),
}
}
}
@ -97,6 +136,12 @@ impl Discv4ConfigBuilder {
self
}
/// Sets the timeout for enr requests
pub fn enr_request_timeout(&mut self, duration: Duration) -> &mut Self {
self.config.enr_timeout = duration;
self
}
/// Whether to discover random nodes in the DHT.
pub fn enable_dht_random_walk(&mut self, enable_dht_random_walk: bool) -> &mut Self {
self.config.enable_dht_random_walk = enable_dht_random_walk;
@ -109,6 +154,36 @@ impl Discv4ConfigBuilder {
self
}
/// Whether to enable EIP-868
pub fn enable_eip868(&mut self, enable_eip868: bool) -> &mut Self {
self.config.enable_eip868 = enable_eip868;
self
}
/// Add another key value pair to include in the ENR
pub fn add_eip868_pair(&mut self, key: impl AsRef<[u8]>, value: impl Encodable) -> &mut Self {
let mut buf = BytesMut::new();
value.encode(&mut buf);
self.add_eip868_rlp_pair(key, buf.freeze())
}
/// Add another key value pair to include in the ENR
pub fn add_eip868_rlp_pair(&mut self, key: impl AsRef<[u8]>, rlp: Bytes) -> &mut Self {
self.config.additional_eip868_rlp_pairs.insert(key.as_ref().to_vec(), rlp);
self
}
/// Extend additional key value pairs to include in the ENR
pub fn extend_eip868_rlp_pairs(
&mut self,
pairs: impl IntoIterator<Item = (impl AsRef<[u8]>, Bytes)>,
) -> &mut Self {
for (k, v) in pairs.into_iter() {
self.add_eip868_rlp_pair(k, v);
}
self
}
/// A set of lists that can ban IP's or PeerIds from the server. See
/// [`BanList`].
pub fn ban_list(&mut self, ban_list: BanList) -> &mut Self {

View File

@ -22,15 +22,18 @@ use crate::{
node::{kad_key, NodeKey},
proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
};
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use discv5::{
kbucket,
kbucket::{
Distance, Entry as BucketEntry, FailureReason, InsertResult, KBucketsTable, NodeStatus,
MAX_NODES_PER_BUCKET,
},
ConnectionDirection, ConnectionState,
};
use reth_primitives::{PeerId, H256};
use enr::{Enr, EnrBuilder};
use proto::{EnrRequest, EnrResponse};
use reth_primitives::{ForkId, PeerId, H256};
use secp256k1::SecretKey;
use std::{
cell::RefCell,
@ -169,19 +172,18 @@ impl Discv4 {
/// ```
pub async fn bind(
local_address: SocketAddr,
mut local_enr: NodeRecord,
mut local_node_record: NodeRecord,
secret_key: SecretKey,
config: Discv4Config,
) -> io::Result<(Self, Discv4Service)> {
let socket = UdpSocket::bind(local_address).await?;
let local_addr = socket.local_addr()?;
local_enr.udp_port = local_addr.port();
local_node_record.udp_port = local_addr.port();
trace!( target : "discv4", ?local_addr,"opened UDP socket");
// We don't expect many commands, so the buffer can be quite small here.
let (to_service, rx) = mpsc::channel(5);
let (to_service, rx) = mpsc::channel(100);
let service =
Discv4Service::new(socket, local_addr, local_enr, secret_key, config, Some(rx));
Discv4Service::new(socket, local_addr, local_node_record, secret_key, config, Some(rx));
let discv4 = Discv4 { local_addr, to_service };
Ok((discv4, service))
}
@ -230,8 +232,7 @@ impl Discv4 {
/// Removes the peer from the table, if it exists.
pub fn remove_peer(&self, node_id: PeerId) {
let cmd = Discv4Command::Remove(node_id);
// we want this message to arrive, so we clone the sender
let _ = self.to_service.clone().try_send(cmd);
self.safe_send_to_service(cmd);
}
/// Adds the peer and id to the ban list.
@ -239,16 +240,14 @@ impl Discv4 {
/// This will prevent any future inclusion in the table
pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
let cmd = Discv4Command::Ban(node_id, ip);
// we want this message to arrive, so we clone the sender
let _ = self.to_service.clone().try_send(cmd);
self.safe_send_to_service(cmd);
}
/// Adds the ip to the ban list.
///
/// This will prevent any future inclusion in the table
pub fn ban_ip(&self, ip: IpAddr) {
let cmd = Discv4Command::BanIp(ip);
// we want this message to arrive, so we clone the sender
let _ = self.to_service.clone().try_send(cmd);
self.safe_send_to_service(cmd);
}
/// Adds the peer to the ban list.
@ -256,7 +255,38 @@ impl Discv4 {
/// This will prevent any future inclusion in the table
pub fn ban_node(&self, node_id: PeerId) {
let cmd = Discv4Command::BanPeer(node_id);
// we want this message to arrive, so we clone the sender
self.safe_send_to_service(cmd);
}
/// Sets the tcp port
///
/// This will update our [`NodeRecord`]'s tcp port.
pub fn set_tcp_port(&self, port: u16) {
let cmd = Discv4Command::SetTcpPort(port);
self.safe_send_to_service(cmd);
}
/// Sets the pair in the EIP-868 [`Enr`] of the node.
///
/// If the key already exists, this will update it.
///
/// CAUTION: The value **must** be rlp encoded
pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
self.safe_send_to_service(cmd);
}
/// Sets the pair in the EIP-868 [`Enr`] of the node.
///
/// If the key already exists, this will update it.
pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl reth_rlp::Encodable) {
let mut buf = BytesMut::new();
value.encode(&mut buf);
self.set_eip868_rlp_pair(key, buf.freeze())
}
fn safe_send_to_service(&self, cmd: Discv4Command) {
// we want this message to always arrive, so we clone the sender
let _ = self.to_service.clone().try_send(cmd);
}
@ -284,8 +314,10 @@ impl Discv4 {
pub struct Discv4Service {
/// Local address of the UDP socket.
local_address: SocketAddr,
/// The local ENR for EIP-868 <https://eips.ethereum.org/EIPS/eip-868>
local_eip_868_enr: Enr<SecretKey>,
/// Local ENR of the server.
local_enr: NodeRecord,
local_node_record: NodeRecord,
/// The secret key used to sign payloads
secret_key: SecretKey,
/// The UDP socket for sending and receiving messages.
@ -313,6 +345,8 @@ pub struct Discv4Service {
pending_pings: HashMap<PeerId, PingRequest>,
/// Currently active FindNode requests
pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
/// Currently active ENR requests
pending_enr_requests: HashMap<PeerId, EnrRequestState>,
/// Commands listener
commands_rx: Option<mpsc::Receiver<Discv4Command>>,
/// All subscribers for table updates
@ -334,7 +368,7 @@ impl Discv4Service {
pub(crate) fn new(
socket: UdpSocket,
local_address: SocketAddr,
local_enr: NodeRecord,
local_node_record: NodeRecord,
secret_key: SecretKey,
config: Discv4Config,
commands_rx: Option<mpsc::Receiver<Discv4Command>>,
@ -349,13 +383,13 @@ impl Discv4Service {
let mut tasks = JoinSet::<()>::new();
let udp = Arc::clone(&socket);
tasks.spawn(async move { receive_loop(udp, ingress_tx, local_enr.id).await });
tasks.spawn(async move { receive_loop(udp, ingress_tx, local_node_record.id).await });
let udp = Arc::clone(&socket);
tasks.spawn(async move { send_loop(udp, egress_rx).await });
let kbuckets = KBucketsTable::new(
local_enr.key(),
local_node_record.key(),
Duration::from_secs(60),
MAX_NODES_PER_BUCKET,
None,
@ -364,9 +398,17 @@ impl Discv4Service {
let self_lookup_interval = tokio::time::interval(config.lookup_interval);
let ping_interval = tokio::time::interval(config.ping_interval);
// Wait `ping_interval` and then start pinging every `ping_interval` because we want to wait
// for
let ping_interval = tokio::time::interval_at(
tokio::time::Instant::now() + config.ping_interval,
config.ping_interval,
);
let evict_expired_requests_interval = tokio::time::interval(config.find_node_timeout);
let evict_expired_requests_interval = tokio::time::interval_at(
tokio::time::Instant::now() + config.request_timeout,
config.request_timeout,
);
let lookup_rotator = if config.enable_dht_random_walk {
LookupTargetRotator::default()
@ -374,9 +416,29 @@ impl Discv4Service {
LookupTargetRotator::local_only()
};
// for EIP-868 construct an ENR
let local_eip_868_enr = {
let mut builder = EnrBuilder::new("v4");
builder.ip(local_node_record.address);
if local_node_record.address.is_ipv4() {
builder.udp4(local_node_record.udp_port);
builder.tcp4(local_node_record.tcp_port);
} else {
builder.udp6(local_node_record.udp_port);
builder.tcp6(local_node_record.tcp_port);
}
for (key, val) in config.additional_eip868_rlp_pairs.iter() {
builder.add_value_rlp(key, val.clone());
}
builder.build(&secret_key).expect("v4 is set; qed")
};
Discv4Service {
local_address,
local_enr,
local_eip_868_enr,
local_node_record,
_socket: socket,
kbuckets,
secret_key,
@ -386,6 +448,7 @@ impl Discv4Service {
queued_pings: Default::default(),
pending_pings: Default::default(),
pending_find_nodes: Default::default(),
pending_enr_requests: Default::default(),
check_timestamps: false,
commands_rx,
update_listeners: Vec::with_capacity(1),
@ -397,6 +460,15 @@ impl Discv4Service {
}
}
/// Returns the current enr sequence
fn enr_seq(&self) -> Option<u64> {
if self.config.enable_eip868 {
Some(self.local_eip_868_enr.seq())
} else {
None
}
}
/// Returns the address of the UDP socket
pub fn local_addr(&self) -> SocketAddr {
self.local_address
@ -404,13 +476,13 @@ impl Discv4Service {
/// Returns the ENR of this service.
pub fn local_enr(&self) -> NodeRecord {
self.local_enr
self.local_node_record
}
/// Returns mutable reference to ENR for testing.
#[cfg(test)]
pub fn local_enr_mut(&mut self) -> &mut NodeRecord {
&mut self.local_enr
&mut self.local_node_record
}
/// Returns true if the given PeerId is currently in the bucket
@ -437,7 +509,8 @@ impl Discv4Service {
for record in self.config.bootstrap_nodes.clone() {
debug!(target : "discv4", ?record, "Adding bootstrap node");
let key = kad_key(record.id);
let entry = NodeEntry { record, last_seen: Instant::now() };
let entry =
NodeEntry { record, last_seen: Instant::now(), last_enr_seq: None, fork_id: None };
// insert the boot node in the table
let _ = self.kbuckets.insert_or_update(
@ -448,8 +521,7 @@ impl Discv4Service {
direction: ConnectionDirection::Outgoing,
},
);
self.try_ping(record, PingReason::Normal);
self.try_ping(record, PingReason::Initial);
}
}
@ -474,7 +546,7 @@ impl Discv4Service {
/// Looks up the local node in the DHT.
pub fn lookup_self(&mut self) {
self.lookup(self.local_enr.id)
self.lookup(self.local_node_record.id)
}
/// Looks up the given node in the DHT
@ -578,41 +650,128 @@ impl Discv4Service {
let key = kad_key(node_id);
let removed = self.kbuckets.remove(&key);
if removed {
debug!(target: "discv4", ?node_id, "removed node");
self.notify(DiscoveryUpdate::Removed(node_id));
}
removed
}
/// Updates the node entry
fn update_node(&mut self, record: NodeRecord) {
if record.id == self.local_enr.id {
/// Update the entry on RE-ping
///
/// On re-ping we check for a changed enr_seq if eip868 is enabled and when it changed we sent a
/// followup request to retrieve the updated ENR
fn update_on_reping(&mut self, record: NodeRecord, last_enr_seq: Option<u64>) {
if record.id == self.local_node_record.id {
return
}
let last_enr_seq = if let Some(last_enr_seq) = last_enr_seq {
last_enr_seq
} else {
// no need to check for increased enr seq
let _ = self.insert_or_update(record, None);
return
};
let key = kad_key(record.id);
let entry = NodeEntry { record, last_seen: Instant::now() };
match self.kbuckets.insert_or_update(
let mut fork_id = None;
let mut unchanged = false;
if let kbucket::Entry::Present(entry, _) = self.kbuckets.entry(&key) {
let value = entry.value();
// we want to keep the forkid if it was set previously and only update the timestamp
if let Some(current_enr) = value.last_enr_seq {
if last_enr_seq == current_enr {
unchanged = true;
// ENR sequence is unchanged, so we can keep the fork id
fork_id = value.fork_id;
}
}
}
// update the value but keep the existing forkid
if unchanged || fork_id.is_some() {
let entry = NodeEntry {
record,
last_seen: Instant::now(),
last_enr_seq: Some(last_enr_seq),
fork_id,
};
let _ = self.kbuckets.insert_or_update(
&key,
entry,
NodeStatus {
state: ConnectionState::Connected,
direction: ConnectionDirection::Outgoing,
},
);
} else {
// update the value and re-request the ENR
let _ = self.insert_or_update(record, Some(last_enr_seq));
}
}
/// Updates the node entry
///
/// Inserts the given node into the table.
///
/// If EIP-868 is enabled this will send a followup request to ask for the ENR of the node, if
/// the given enr seq is `Some`.
fn insert_or_update(
&mut self,
record: NodeRecord,
mut last_enr_seq: Option<u64>,
) -> InsertResult<NodeKey> {
if record.id == self.local_node_record.id {
return InsertResult::Failed(FailureReason::InvalidSelfUpdate)
}
// If EIP868 extension is disabled then we want to ignore this
if !self.config.enable_eip868 {
last_enr_seq = None;
}
// if the peer included a enr seq in the pong then we can try to request the ENR of that
// node
let has_enr_seq = last_enr_seq.is_some();
let key = kad_key(record.id);
let entry = NodeEntry { record, last_seen: Instant::now(), last_enr_seq, fork_id: None };
let res = self.kbuckets.insert_or_update(
&key,
entry,
NodeStatus {
state: ConnectionState::Connected,
direction: ConnectionDirection::Outgoing,
},
) {
);
match &res {
InsertResult::Inserted => {
debug!(target : "discv4",?record, "inserted new record to table");
self.notify(DiscoveryUpdate::Added(record));
if has_enr_seq {
// request the ENR of the node
self.send_enr_request(record);
}
}
InsertResult::ValueUpdated { .. } | InsertResult::Updated { .. } => {
InsertResult::ValueUpdated => {
trace!(target : "discv4",?record, "updated record");
if has_enr_seq {
// request the ENR of the node
self.send_enr_request(record);
}
}
InsertResult::Failed(FailureReason::BucketFull) => {
debug!(target : "discv4", ?record, "discovered new record but bucket is full");
self.notify(DiscoveryUpdate::Discovered(record));
trace!(target : "discv4", ?record, "discovered new record but bucket is full");
}
res => {
warn!(target : "discv4",?record, ?res, "failed to insert");
}
}
};
res
}
/// Adds all nodes
@ -627,13 +786,8 @@ impl Discv4Service {
/// If the node's not in the table yet, this will start a ping to get it added on ping.
pub fn add_node(&mut self, record: NodeRecord) {
let key = kad_key(record.id);
#[allow(clippy::single_match)]
match self.kbuckets.entry(&key) {
BucketEntry::Absent(_) => self.try_ping(record, PingReason::Normal),
_ => {
// is already in the table
}
if let BucketEntry::Absent(_) = self.kbuckets.entry(&key) {
self.try_ping(record, PingReason::Initial)
}
}
@ -664,7 +818,12 @@ impl Discv4Service {
self.add_node(record);
// send the pong
let msg = Message::Pong(Pong { to: ping.from, echo: hash, expire: ping.expire });
let msg = Message::Pong(Pong {
to: ping.from,
echo: hash,
expire: ping.expire,
enr_sq: self.enr_seq(),
});
self.send_packet(msg, remote_addr);
}
@ -693,8 +852,12 @@ impl Discv4Service {
pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> H256 {
let remote_addr = node.udp_addr();
let id = node.id;
let ping =
Ping { from: self.local_enr.into(), to: node.into(), expire: self.ping_timeout() };
let ping = Ping {
from: self.local_node_record.into(),
to: node.into(),
expire: self.ping_timeout(),
enr_sq: self.enr_seq(),
};
trace!(target : "discv4", ?ping, "sending ping");
let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
@ -703,6 +866,23 @@ impl Discv4Service {
echo_hash
}
/// Sends an enr request message to the node's UDP address.
///
/// Returns the echo hash of the ping message.
pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
if !self.config.enable_eip868 {
return
}
let remote_addr = node.udp_addr();
let enr_request = EnrRequest { expire: self.enr_request_timeout() };
trace!(target : "discv4", ?enr_request, "sending enr request");
let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
self.pending_enr_requests
.insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
}
/// Message handler for an incoming `Pong`.
fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
if self.is_expired(pong.expire) {
@ -724,15 +904,18 @@ impl Discv4Service {
};
match reason {
PingReason::Normal => {
self.update_node(node);
PingReason::Initial => {
let _ = self.insert_or_update(node, pong.enr_sq);
}
PingReason::RePing => {
self.update_on_reping(node, pong.enr_sq);
}
PingReason::FindNode(target, status) => {
// update the status of the node
match status {
NodeEntryStatus::Expired | NodeEntryStatus::Valid => {
// update node in the table
self.update_node(node)
let _ = self.insert_or_update(node, pong.enr_sq);
}
NodeEntryStatus::IsLocal | NodeEntryStatus::Unknown => {}
}
@ -740,7 +923,7 @@ impl Discv4Service {
self.respond_closest(target, remote_addr);
}
PingReason::Lookup(node, ctx) => {
self.update_node(node);
let _ = self.insert_or_update(node, pong.enr_sq);
self.find_node(&node, ctx);
}
}
@ -766,6 +949,70 @@ impl Discv4Service {
}
}
/// Handler for incoming `EnrResponse` message
fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
trace!(target : "discv4", ?remote_addr, ?msg, "received ENR response");
if let Some(resp) = self.pending_enr_requests.remove(&id) {
if resp.echo_hash == msg.request_hash {
let key = kad_key(id);
if let Some((record, existing_fork_id)) =
self.kbuckets.get_bucket(&key).and_then(|bucket| {
bucket.get(&key).map(|entry| (entry.value.record, entry.value.fork_id))
})
{
let fork_id = msg.eth_fork_id();
let entry = NodeEntry {
record,
last_seen: Instant::now(),
last_enr_seq: Some(msg.enr.seq()),
fork_id,
};
let _ = self.kbuckets.insert_or_update(
&key,
entry,
NodeStatus {
state: ConnectionState::Connected,
direction: ConnectionDirection::Outgoing,
},
);
if fork_id == existing_fork_id {
// nothing to update
return
}
if let Some(fork_id) = fork_id {
self.notify(DiscoveryUpdate::EnrForkId(record, fork_id))
}
}
}
}
}
/// Handler for incoming `EnrRequest` message
fn on_enr_request(
&mut self,
msg: EnrRequest,
remote_addr: SocketAddr,
id: PeerId,
request_hash: H256,
) {
if !self.config.enable_eip868 || self.is_expired(msg.expire) {
return
}
let key = kad_key(id);
if let BucketEntry::Present(_, _) = self.kbuckets.entry(&key) {
self.send_packet(
Message::EnrResponse(EnrResponse {
request_hash,
enr: self.local_eip_868_enr.clone(),
}),
remote_addr,
);
}
}
/// Handler for incoming `Neighbours` messages that are handled if they're responses to
/// `FindNode` requests
fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
@ -800,7 +1047,7 @@ impl Discv4Service {
}
};
let our_key = kad_key(self.local_enr.id);
let our_key = kad_key(self.local_node_record.id);
// This is the recursive lookup step where we initiate new FindNode requests for new nodes
// that where discovered.
@ -846,7 +1093,7 @@ impl Discv4Service {
/// Returns the current status of the node
fn node_status(&mut self, node: PeerId, addr: SocketAddr) -> NodeEntryStatus {
if node == self.local_enr.id {
if node == self.local_node_record.id {
debug!( target : "discv4", ?node,"Got an incoming discovery request from self");
return NodeEntryStatus::IsLocal
}
@ -873,6 +1120,10 @@ impl Discv4Service {
}
fn evict_expired_requests(&mut self, now: Instant) {
self.pending_enr_requests.retain(|_node_id, enr_request| {
now.duration_since(enr_request.sent_at) < self.config.ping_timeout
});
let mut nodes_to_expire = Vec::new();
self.pending_pings.retain(|node_id, ping_request| {
if now.duration_since(ping_request.sent_at) > self.config.ping_timeout {
@ -882,7 +1133,7 @@ impl Discv4Service {
true
});
self.pending_find_nodes.retain(|node_id, find_node_request| {
if now.duration_since(find_node_request.sent_at) > self.config.find_node_timeout {
if now.duration_since(find_node_request.sent_at) > self.config.request_timeout {
if !find_node_request.answered {
nodes_to_expire.push(*node_id);
}
@ -890,8 +1141,11 @@ impl Discv4Service {
}
true
});
debug!(target: "discv4", num=%nodes_to_expire.len(), "evicting nodes");
for node_id in nodes_to_expire {
self.expire_node_request(node_id);
self.remove_node(node_id);
}
}
@ -901,16 +1155,10 @@ impl Discv4Service {
nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
for node in to_ping {
self.try_ping(node, PingReason::Normal)
self.try_ping(node, PingReason::RePing)
}
}
/// Removes the node from the table
fn expire_node_request(&mut self, node_id: PeerId) {
let key = kad_key(node_id);
self.kbuckets.remove(&key);
}
/// Returns true if the expiration timestamp is considered invalid.
fn is_expired(&self, expiration: u64) -> bool {
self.ensure_timestamp(expiration).is_err()
@ -941,10 +1189,14 @@ impl Discv4Service {
}
fn find_node_timeout(&self) -> u64 {
(SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.find_node_timeout)
(SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
.as_secs()
}
fn enr_request_timeout(&self) -> u64 {
(SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_timeout).as_secs()
}
fn send_neighbours_timeout(&self) -> u64 {
(SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_timeout)
.as_secs()
@ -958,7 +1210,7 @@ impl Discv4Service {
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
// trigger self lookup
if self.config.enable_lookup && self.lookup_interval.poll_tick(cx).is_ready() {
let target = self.lookup_rotator.next(&self.local_enr.id);
let target = self.lookup_rotator.next(&self.local_node_record.id);
self.lookup_with(target, None);
}
@ -979,7 +1231,7 @@ impl Discv4Service {
if let Some(cmd) = cmd {
match cmd {
Discv4Command::Lookup { node_id, tx } => {
let node_id = node_id.unwrap_or(self.local_enr.id);
let node_id = node_id.unwrap_or(self.local_node_record.id);
self.lookup_with(node_id, tx);
}
Discv4Command::Updates(tx) => {
@ -997,6 +1249,21 @@ impl Discv4Service {
Discv4Command::BanIp(ip) => {
self.ban_ip(ip);
}
Discv4Command::SetEIP868RLPPair { key, rlp } => {
debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
let _ =
self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
}
Discv4Command::SetTcpPort(port) => {
debug!(target: "discv4", %port, "Update tcp port");
self.local_node_record.tcp_port = port;
if self.local_node_record.address.is_ipv4() {
let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
} else {
let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
}
}
}
} else {
is_done = true;
@ -1034,6 +1301,14 @@ impl Discv4Service {
self.on_neighbours(msg, remote_addr, node_id);
Poll::Ready(Discv4Event::Neighbours)
}
Message::EnrRequest(msg) => {
self.on_enr_request(msg, remote_addr, node_id, hash);
Poll::Ready(Discv4Event::EnrRequest)
}
Message::EnrResponse(msg) => {
self.on_enr_response(msg, remote_addr, node_id);
Poll::Ready(Discv4Event::EnrResponse)
}
}
}
}
@ -1068,6 +1343,10 @@ pub enum Discv4Event {
FindNode,
/// A `Neighbours` message was handled.
Neighbours,
/// A `EnrRequest` message was handled.
EnrRequest,
/// A `EnrResponse` message was handled.
EnrResponse,
}
/// Continuously reads new messages from the channel and writes them to the socket
@ -1128,6 +1407,8 @@ pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_i
/// The commands sent from the frontend to the service
enum Discv4Command {
SetTcpPort(u16),
SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
Ban(PeerId, IpAddr),
BanPeer(PeerId),
BanIp(IpAddr),
@ -1332,6 +1613,13 @@ impl FindNodeRequest {
}
}
struct EnrRequestState {
// Timestamp when the request was sent.
sent_at: Instant,
// Hash sent in the Ping request
echo_hash: H256,
}
/// Stored node info.
#[derive(Debug, Clone, Eq, PartialEq)]
struct NodeEntry {
@ -1339,6 +1627,10 @@ struct NodeEntry {
record: NodeRecord,
/// Timestamp of last pong.
last_seen: Instant,
/// Last enr seq we retrieved via a ENR request.
last_enr_seq: Option<u64>,
/// ForkId if retrieved via ENR requests.
fork_id: Option<ForkId>,
}
/// The status ofs a specific node
@ -1365,8 +1657,10 @@ impl NodeEntry {
/// Represents why a ping is issued
enum PingReason {
/// Standard ping
Normal,
/// Initial ping to a previously unknown peer.
Initial,
/// Re-ping a peer..
RePing,
/// Ping issued to adhere to endpoint proof procedure
///
/// Once the expected PONG is received, the endpoint proof is complete and the find node can be
@ -1379,10 +1673,10 @@ enum PingReason {
/// Represents node related updates state changes in the underlying node table
#[derive(Debug, Clone)]
pub enum DiscoveryUpdate {
/// Received a [`ForkId`] via EIP-868 for the given [`NodeRecord`].
EnrForkId(NodeRecord, ForkId),
/// A new node was discovered _and_ added to the table.
Added(NodeRecord),
/// A new node was discovered but _not_ added to the table because the bucket is full.
Discovered(NodeRecord),
/// Node that was removed from the table
Removed(PeerId),
/// A series of updates
@ -1396,6 +1690,7 @@ mod tests {
bootnodes::mainnet_nodes,
mock::{create_discv4, create_discv4_with_config},
};
use reth_primitives::{hex_literal::hex, ForkHash};
#[test]
fn test_local_rotator() {
@ -1434,9 +1729,14 @@ mod tests {
#[ignore]
async fn test_lookup() {
reth_tracing::init_tracing();
let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
let all_nodes = mainnet_nodes();
let config = Discv4Config::builder().add_boot_nodes(all_nodes).build();
let config = Discv4Config::builder()
.add_boot_nodes(all_nodes)
.lookup_interval(Duration::from_secs(1))
.add_eip868_pair("eth", fork_id)
.build();
let (_discv4, mut service) = create_discv4_with_config(config).await;
let mut updates = service.update_stream();
@ -1446,6 +1746,9 @@ mod tests {
let mut table = HashMap::new();
while let Some(update) = updates.next().await {
match update {
DiscoveryUpdate::EnrForkId(record, fork_id) => {
println!("{record:?}, {fork_id:?}");
}
DiscoveryUpdate::Added(record) => {
table.insert(record.id, record);
}

View File

@ -9,7 +9,7 @@ use crate::{
IngressReceiver, PeerId, SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS,
};
use rand::{thread_rng, Rng, RngCore};
use reth_primitives::H256;
use reth_primitives::{hex_literal::hex, ForkHash, ForkId, H256};
use secp256k1::{SecretKey, SECP256K1};
use std::{
collections::{HashMap, HashSet},
@ -154,7 +154,12 @@ impl Stream for MockDiscovery {
IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => match msg {
Message::Ping(ping) => {
if this.pending_pongs.remove(&node_id) {
let pong = Pong { to: ping.from, echo: hash, expire: ping.expire };
let pong = Pong {
to: ping.from,
echo: hash,
expire: ping.expire,
enr_sq: None,
};
let msg = Message::Pong(pong.clone());
this.send_packet(msg, remote_addr);
return Poll::Ready(Some(MockEvent::Pong {
@ -179,6 +184,7 @@ impl Stream for MockDiscovery {
}
}
Message::Neighbours(_) => {}
Message::EnrRequest(_) | Message::EnrResponse(_) => todo!(),
},
}
}
@ -201,7 +207,8 @@ pub enum MockCommand {
/// Creates a new testing instance for [`Discv4`] and its service
pub async fn create_discv4() -> (Discv4, Discv4Service) {
create_discv4_with_config(Default::default()).await
let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
create_discv4_with_config(Discv4Config::builder().add_eip868_pair("eth", fork_id).build()).await
}
/// Creates a new testing instance for [`Discv4`] and its service with the given config.
@ -254,8 +261,14 @@ pub fn rng_message(rng: &mut impl RngCore) -> Message {
from: rng_endpoint(rng),
to: rng_endpoint(rng),
expire: rng.gen(),
enr_sq: None,
}),
2 => Message::Pong(Pong {
to: rng_endpoint(rng),
echo: H256::random(),
expire: rng.gen(),
enr_sq: None,
}),
2 => Message::Pong(Pong { to: rng_endpoint(rng), echo: H256::random(), expire: rng.gen() }),
3 => Message::FindNode(FindNode { id: PeerId::random(), expire: rng.gen() }),
4 => {
let num: usize = rng.gen_range(1..=SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS);
@ -272,6 +285,7 @@ pub fn rng_message(rng: &mut impl RngCore) -> Message {
mod tests {
use super::*;
use crate::{Discv4Event, PingReason};
use reth_primitives::{hex_literal::hex, ForkHash, ForkId};
use std::net::{IpAddr, Ipv4Addr};
/// This test creates two local UDP sockets. The mocked discovery service responds to specific
@ -297,7 +311,7 @@ mod tests {
mockv4.queue_pong(discv_enr.id);
// This sends a ping to the mock service
let echo_hash = service.send_ping(mock_enr, PingReason::Normal);
let echo_hash = service.send_ping(mock_enr, PingReason::Initial);
// process the mock pong
let event = mockv4.next().await.unwrap();

View File

@ -39,6 +39,9 @@ pub(crate) fn kad_key(node: PeerId) -> discv5::Key<NodeKey> {
}
/// Represents a ENR in discv4.
///
/// Note: this is only an excerpt of the [ENR](enr::Enr) datastructure which is sent in Neighbours
/// message.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub struct NodeRecord {
/// The Address of a node.

View File

@ -2,7 +2,8 @@
use crate::{error::DecodePacketError, node::NodeRecord, PeerId, MAX_PACKET_SIZE, MIN_PACKET_SIZE};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use reth_primitives::{keccak256, H256};
use enr::Enr;
use reth_primitives::{keccak256, ForkId, H256};
use reth_rlp::{Decodable, DecodeError, Encodable, Header};
use reth_rlp_derive::{RlpDecodable, RlpEncodable};
use secp256k1::{
@ -21,6 +22,8 @@ pub enum MessageId {
Pong = 2,
FindNode = 3,
Neighbours = 4,
EnrRequest = 5,
EnrResponse = 6,
}
impl MessageId {
@ -31,6 +34,8 @@ impl MessageId {
2 => MessageId::Pong,
3 => MessageId::FindNode,
4 => MessageId::Neighbours,
5 => MessageId::EnrRequest,
6 => MessageId::EnrResponse,
_ => return Err(msg),
};
Ok(msg)
@ -44,6 +49,8 @@ pub enum Message {
Pong(Pong),
FindNode(FindNode),
Neighbours(Neighbours),
EnrRequest(EnrRequest),
EnrResponse(EnrResponse),
}
// === impl Message ===
@ -56,6 +63,8 @@ impl Message {
Message::Pong(_) => MessageId::Pong,
Message::FindNode(_) => MessageId::FindNode,
Message::Neighbours(_) => MessageId::Neighbours,
Message::EnrRequest(_) => MessageId::EnrRequest,
Message::EnrResponse(_) => MessageId::EnrResponse,
}
}
@ -89,6 +98,14 @@ impl Message {
payload.put_u8(4);
message.encode(&mut payload);
}
Message::EnrRequest(message) => {
payload.put_u8(5);
message.encode(&mut payload);
}
Message::EnrResponse(message) => {
payload.put_u8(6);
message.encode(&mut payload);
}
}
let signature: RecoverableSignature = SECP256K1.sign_ecdsa_recoverable(
@ -146,6 +163,8 @@ impl Message {
MessageId::Pong => Message::Pong(Pong::decode(payload)?),
MessageId::FindNode => Message::FindNode(FindNode::decode(payload)?),
MessageId::Neighbours => Message::Neighbours(Neighbours::decode(payload)?),
MessageId::EnrRequest => Message::EnrRequest(EnrRequest::decode(payload)?),
MessageId::EnrResponse => Message::EnrResponse(EnrResponse::decode(payload)?),
};
Ok(Packet { msg, node_id, hash: header_hash })
@ -220,7 +239,7 @@ impl From<NodeRecord> for NodeEndpoint {
}
}
/// A [FindNode packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#findnode-packet-0x03).).
/// A [FindNode packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#findnode-packet-0x03).
#[derive(Clone, Copy, Debug, Eq, PartialEq, RlpEncodable, RlpDecodable)]
pub struct FindNode {
pub id: PeerId,
@ -234,12 +253,65 @@ pub struct Neighbours {
pub expire: u64,
}
/// A [ENRRequest packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#enrrequest-packet-0x05).
#[derive(Clone, Copy, Debug, Eq, PartialEq, RlpEncodable, RlpDecodable)]
pub struct EnrRequest {
pub expire: u64,
}
/// A [ENRResponse packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#enrresponse-packet-0x06).
#[derive(Clone, Debug, Eq, PartialEq, RlpEncodable)]
pub struct EnrResponse {
pub request_hash: H256,
pub enr: Enr<SecretKey>,
}
// === impl EnrResponse ===
impl EnrResponse {
/// Returns the [`ForkId`] if set
///
/// See also <https://github.com/ethereum/go-ethereum/blob/9244d5cd61f3ea5a7645fdf2a1a96d53421e412f/eth/protocols/eth/discovery.go#L36>
pub fn eth_fork_id(&self) -> Option<ForkId> {
let mut maybe_fork_id = self.enr.get(b"eth")?;
ForkId::decode(&mut maybe_fork_id).ok()
}
}
impl Decodable for EnrResponse {
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError> {
let b = &mut &**buf;
let rlp_head = Header::decode(b)?;
if !rlp_head.list {
return Err(DecodeError::UnexpectedString)
}
// let started_len = b.len();
let this = Self {
request_hash: reth_rlp::Decodable::decode(b)?,
enr: reth_rlp::Decodable::decode(b)?,
};
// TODO: `Decodable` can be derived once we have native reth_rlp decoding for ENR: <https://github.com/paradigmxyz/reth/issues/482>
// Skipping the size check here is fine since the `buf` is the UDP datagram
// let consumed = started_len - b.len();
// if consumed != rlp_head.payload_length {
// return Err(reth_rlp::DecodeError::ListLengthMismatch {
// expected: rlp_head.payload_length,
// got: consumed,
// })
// }
*buf = *b;
Ok(this)
}
}
/// A [Ping packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01).
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Ping {
pub from: NodeEndpoint,
pub to: NodeEndpoint,
pub expire: u64,
/// Optional enr_seq for <https://eips.ethereum.org/EIPS/eip-868>
pub enr_sq: Option<u64>,
}
impl Encodable for Ping {
@ -251,13 +323,33 @@ impl Encodable for Ping {
to: &'a NodeEndpoint,
expire: u64,
}
V4PingMessage {
version: 4, // version 4
from: &self.from,
to: &self.to,
expire: self.expire,
#[derive(RlpEncodable)]
struct V4PingMessageEIP868<'a> {
version: u32,
from: &'a NodeEndpoint,
to: &'a NodeEndpoint,
expire: u64,
enr_seq: u64,
}
if let Some(enr_seq) = self.enr_sq {
V4PingMessageEIP868 {
version: 4, // version 4
from: &self.from,
to: &self.to,
expire: self.expire,
enr_seq,
}
.encode(out);
} else {
V4PingMessage {
version: 4, // version 4
from: &self.from,
to: &self.to,
expire: self.expire,
}
.encode(out);
}
.encode(out)
}
}
@ -270,11 +362,18 @@ impl Decodable for Ping {
}
let started_len = b.len();
let _version = u32::decode(b)?;
let this = Self {
let mut this = Self {
from: Decodable::decode(b)?,
to: Decodable::decode(b)?,
expire: Decodable::decode(b)?,
enr_sq: None,
};
// only decode the ENR sequence if there's more data in the datagram to decode else skip
if b.has_remaining() {
this.enr_sq = Some(Decodable::decode(b)?);
}
let consumed = started_len - b.len();
if consumed > rlp_head.payload_length {
return Err(DecodeError::ListLengthMismatch {
@ -290,12 +389,39 @@ impl Decodable for Ping {
}
/// A [Pong packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#pong-packet-0x02).
// #[derive(Clone, Debug, Eq, PartialEq, RlpEncodable, RlpDecodable)]
#[derive(Clone, Debug, Eq, PartialEq, RlpEncodable)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Pong {
pub to: NodeEndpoint,
pub echo: H256,
pub expire: u64,
/// Optional enr_seq for <https://eips.ethereum.org/EIPS/eip-868>
pub enr_sq: Option<u64>,
}
impl Encodable for Pong {
fn encode(&self, out: &mut dyn BufMut) {
#[derive(RlpEncodable)]
struct PongMessageEIP868<'a> {
to: &'a NodeEndpoint,
echo: &'a H256,
expire: u64,
enr_seq: u64,
}
#[derive(RlpEncodable)]
struct PongMessage<'a> {
to: &'a NodeEndpoint,
echo: &'a H256,
expire: u64,
}
if let Some(enr_seq) = self.enr_sq {
PongMessageEIP868 { to: &self.to, echo: &self.echo, expire: self.expire, enr_seq }
.encode(out);
} else {
PongMessage { to: &self.to, echo: &self.echo, expire: self.expire }.encode(out);
}
}
}
impl Decodable for Pong {
@ -306,11 +432,18 @@ impl Decodable for Pong {
return Err(DecodeError::UnexpectedString)
}
let started_len = b.len();
let this = Self {
let mut this = Self {
to: Decodable::decode(b)?,
echo: Decodable::decode(b)?,
expire: Decodable::decode(b)?,
enr_sq: None,
};
// only decode the ENR sequence if there's more data in the datagram to decode else skip
if b.has_remaining() {
this.enr_sq = Some(Decodable::decode(b)?);
}
let consumed = started_len - b.len();
if consumed > rlp_head.payload_length {
return Err(DecodeError::ListLengthMismatch {
@ -321,6 +454,7 @@ impl Decodable for Pong {
let rem = rlp_head.payload_length - consumed;
b.advance(rem);
*buf = *b;
Ok(this)
}
}
@ -439,7 +573,12 @@ mod tests {
for _ in 0..100 {
let mut ip = [0u8; 16];
rng.fill_bytes(&mut ip);
let msg = Ping { from: rng_endpoint(&mut rng), to: rng_endpoint(&mut rng), expire: 0 };
let msg = Ping {
from: rng_endpoint(&mut rng),
to: rng_endpoint(&mut rng),
expire: 0,
enr_sq: None,
};
let mut buf = BytesMut::new();
msg.encode(&mut buf);
@ -449,6 +588,69 @@ mod tests {
}
}
#[test]
fn test_ping_message_with_enr() {
let mut rng = thread_rng();
for _ in 0..100 {
let mut ip = [0u8; 16];
rng.fill_bytes(&mut ip);
let msg = Ping {
from: rng_endpoint(&mut rng),
to: rng_endpoint(&mut rng),
expire: 0,
enr_sq: Some(rng.gen()),
};
let mut buf = BytesMut::new();
msg.encode(&mut buf);
let decoded = Ping::decode(&mut buf.as_ref()).unwrap();
assert_eq!(msg, decoded);
}
}
#[test]
fn test_pong_message() {
let mut rng = thread_rng();
for _ in 0..100 {
let mut ip = [0u8; 16];
rng.fill_bytes(&mut ip);
let msg = Pong {
to: rng_endpoint(&mut rng),
echo: H256::random(),
expire: rng.gen(),
enr_sq: None,
};
let mut buf = BytesMut::new();
msg.encode(&mut buf);
let decoded = Pong::decode(&mut buf.as_ref()).unwrap();
assert_eq!(msg, decoded);
}
}
#[test]
fn test_pong_message_with_enr() {
let mut rng = thread_rng();
for _ in 0..100 {
let mut ip = [0u8; 16];
rng.fill_bytes(&mut ip);
let msg = Pong {
to: rng_endpoint(&mut rng),
echo: H256::random(),
expire: rng.gen(),
enr_sq: Some(rng.gen()),
};
let mut buf = BytesMut::new();
msg.encode(&mut buf);
let decoded = Pong::decode(&mut buf.as_ref()).unwrap();
assert_eq!(msg, decoded);
}
}
#[test]
fn test_hash_mismatch() {
let mut rng = thread_rng();

View File

@ -3,7 +3,7 @@
use crate::error::NetworkError;
use futures::StreamExt;
use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config, NodeRecord};
use reth_primitives::PeerId;
use reth_primitives::{ForkId, PeerId};
use secp256k1::SecretKey;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
@ -66,6 +66,12 @@ impl Discovery {
})
}
/// Updates the `eth:ForkId` field in discv4.
#[allow(unused)]
pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
self.discv4.set_eip868_rlp("eth".as_bytes().to_vec(), fork_id)
}
/// Bans the [`IpAddr`] in the discovery service.
pub(crate) fn ban_ip(&self, ip: IpAddr) {
self.discv4.ban_ip(ip)
@ -83,7 +89,7 @@ impl Discovery {
fn on_discv4_update(&mut self, update: DiscoveryUpdate) {
match update {
DiscoveryUpdate::Added(node) | DiscoveryUpdate::Discovered(node) => {
DiscoveryUpdate::Added(node) => {
let id = node.id;
let addr = node.tcp_addr();
match self.discovered_nodes.entry(id) {
@ -94,6 +100,9 @@ impl Discovery {
}
}
}
DiscoveryUpdate::EnrForkId(node, fork_id) => {
self.queued_events.push_back(DiscoveryEvent::EnrForkId(node.id, fork_id))
}
DiscoveryUpdate::Removed(node) => {
self.discovered_nodes.remove(&node);
}
@ -128,6 +137,8 @@ impl Discovery {
pub enum DiscoveryEvent {
/// A new node was discovered
Discovered(PeerId, SocketAddr),
/// Retrieved a [`ForkId`] from the peer via ENR request, See <https://eips.ethereum.org/EIPS/eip-868>
EnrForkId(PeerId, ForkId),
}
#[cfg(test)]

View File

@ -163,6 +163,8 @@ where
// merge configured boot nodes
discovery_v4_config.bootstrap_nodes.extend(boot_nodes.clone());
discovery_v4_config.add_eip868_pair("eth", status.forkid);
let discovery = Discovery::new(discovery_addr, secret_key, discovery_v4_config).await?;
// need to retrieve the addr here since provided port could be `0`
let local_peer_id = discovery.local_id();

View File

@ -5,7 +5,7 @@ use crate::{
use futures::StreamExt;
use reth_eth_wire::{error::EthStreamError, DisconnectReason};
use reth_net_common::ban_list::BanList;
use reth_primitives::PeerId;
use reth_primitives::{ForkId, PeerId};
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
fmt::Display,
@ -236,6 +236,17 @@ impl PeersManager {
self.fill_outbound_slots();
}
/// Called as follow-up for a discovered peer.
///
/// The [`ForkId`] is retrieved from an ENR record that the peer announces over the discovery
/// protocol
pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
trace!(target : "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
peer.fork_id = Some(fork_id);
}
}
/// Called for a newly discovered peer.
///
/// If the peer already exists, then the address will be updated. If the addresses differ, the
@ -274,6 +285,8 @@ impl PeersManager {
/// Returns the idle peer with the highest reputation.
///
/// Peers with a `forkId` are considered better than peers without.
///
/// Returns `None` if no peer is available.
fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| peer.state.is_unconnected());
@ -282,8 +295,18 @@ impl PeersManager {
let mut best_peer = unconnected.next()?;
for maybe_better in unconnected {
if maybe_better.1.reputation > best_peer.1.reputation {
best_peer = maybe_better;
match (maybe_better.1.fork_id.as_ref(), best_peer.1.fork_id.as_ref()) {
(Some(_), Some(_)) | (None, None) => {
if maybe_better.1.reputation > best_peer.1.reputation {
best_peer = maybe_better;
}
}
(Some(_), None) => {
if !maybe_better.1.is_banned() {
best_peer = maybe_better;
}
}
_ => {}
}
}
Some((*best_peer.0, best_peer.1))
@ -422,6 +445,8 @@ pub struct Peer {
reputation: i32,
/// The state of the connection, if any.
state: PeerConnectionState,
/// The [`ForkId`] that the peer announced via discovery.
fork_id: Option<ForkId>,
}
// === impl Peer ===
@ -432,7 +457,7 @@ impl Peer {
}
fn with_state(addr: SocketAddr, state: PeerConnectionState) -> Self {
Self { addr, state, reputation: 0 }
Self { addr, state, reputation: 0, fork_id: None }
}
/// Returns true if the peer's reputation is below the banned threshold.

View File

@ -14,7 +14,7 @@ use crate::{
use reth_eth_wire::{
capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status,
};
use reth_primitives::{PeerId, H256};
use reth_primitives::{ForkId, PeerId, H256};
use reth_provider::BlockProvider;
use std::{
collections::{HashMap, VecDeque},
@ -216,6 +216,12 @@ where
self.state_fetcher.update_peer_block(peer_id, hash, number);
}
/// Invoked on a [`ForkId`] update
#[allow(unused)]
pub(crate) fn update_fork_id(&mut self, _fork_id: ForkId) {
todo!()
}
/// Invoked after a `NewBlock` message was received by the peer.
///
/// This will keep track of blocks we know a peer has
@ -254,8 +260,11 @@ where
/// Event hook for events received from the discovery service.
fn on_discovery_event(&mut self, event: DiscoveryEvent) {
match event {
DiscoveryEvent::Discovered(node, addr) => {
self.peers_manager.add_discovered_node(node, addr);
DiscoveryEvent::Discovered(peer, addr) => {
self.peers_manager.add_discovered_node(peer, addr);
}
DiscoveryEvent::EnrForkId(peer, fork_id) => {
self.peers_manager.set_discovered_fork_id(peer, fork_id);
}
}
}

View File

@ -13,7 +13,8 @@ use reth_network::{NetworkConfig, NetworkEvent, NetworkManager, PeersConfig};
use reth_primitives::PeerId;
use reth_provider::test_utils::TestApi;
use secp256k1::SecretKey;
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
use tokio::task;
#[tokio::test(flavor = "multi_thread")]
async fn test_establish_connections() {
@ -111,6 +112,14 @@ async fn test_connect_with_builder() {
tokio::join!(network, requests);
});
let h = handle.clone();
task::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
dbg!(h.num_connected_peers());
}
});
while let Some(ev) = events.next().await {
dbg!(ev);
}