From 635203759b5703aa77444c143cac096f5eb13d5c Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Fri, 16 Dec 2022 07:14:19 -0500 Subject: [PATCH] feat(discv4): support eth entries (#447) * add ENRRequest and ENRResponse messages * todo: encode and decode impls * scaffold enrrequest and enrresponse msgs * implement encodable and decodable for enr * cargo fmt * impl sending enr requests * silence clippy for now * add todos for enrrequest and response in mocks * make payload length fold more explicit * feat: add enr support to ping pong * integrate enr * add update methods * add enr handling * feat: add enr handling * integrate fork id * fix: set frontier forkid * disable eip868 by default * address comments Co-authored-by: Matthias Seitz --- Cargo.lock | 6 + crates/common/rlp/Cargo.toml | 8 + crates/common/rlp/src/decode.rs | 85 +++++ crates/common/rlp/src/encode.rs | 65 ++++ crates/net/discv4/Cargo.toml | 5 +- crates/net/discv4/src/config.rs | 91 ++++- crates/net/discv4/src/lib.rs | 443 ++++++++++++++++++++---- crates/net/discv4/src/mock.rs | 24 +- crates/net/discv4/src/node.rs | 3 + crates/net/discv4/src/proto.rs | 228 +++++++++++- crates/net/network/src/discovery.rs | 15 +- crates/net/network/src/manager.rs | 2 + crates/net/network/src/peers/manager.rs | 33 +- crates/net/network/src/state.rs | 15 +- crates/net/network/tests/it/connect.rs | 11 +- 15 files changed, 926 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e8d0a2276..6d6e2b0d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1160,6 +1160,7 @@ dependencies = [ "log", "rand 0.8.5", "rlp", + "secp256k1", "serde", "sha3", "zeroize", @@ -3264,6 +3265,7 @@ version = "0.1.0" dependencies = [ "bytes", "discv5", + "enr 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "generic-array", "hex", "public-ip", @@ -3565,11 +3567,15 @@ dependencies = [ "auto_impl", "bytes", "criterion", + "enr 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types", "ethnum", "hex-literal", + "rand 0.8.5", "reth-rlp", "reth-rlp-derive", + "rlp", + "secp256k1", "smol_str", ] diff --git a/crates/common/rlp/Cargo.toml b/crates/common/rlp/Cargo.toml index ede54aa3b..8025cfc8e 100644 --- a/crates/common/rlp/Cargo.toml +++ b/crates/common/rlp/Cargo.toml @@ -12,6 +12,8 @@ auto_impl = "1" bytes = { version = "1", default-features = false } ethnum = { version = "1", default-features = false, optional = true } smol_str = { version = "0.1", default-features = false, optional = true } +enr = { version = "0.7", default-features = false, optional = true } +rlp = { version = "0.5.2", default-features = false, optional = true } ethereum-types = { version = "0.14", features = ["codec"], optional = true } reth-rlp-derive = { version = "0.1", path = "../rlp-derive", optional = true } @@ -21,15 +23,21 @@ reth-rlp = { path = ".", package = "reth-rlp", features = [ "std", "ethnum", "ethereum-types", + "enr", "smol_str" ] } criterion = "0.4.0" hex-literal = "0.3" +rand = "0.8" +secp256k1 = { version = "0.24", features = [ + "rand-std", +] } [features] alloc = [] derive = ["reth-rlp-derive"] std = ["alloc"] +enr = ["dep:enr", "dep:rlp", "enr/rust-secp256k1", "derive"] [[bench]] name = "bench" diff --git a/crates/common/rlp/src/decode.rs b/crates/common/rlp/src/decode.rs index ca96be2f0..5d2abb9a0 100644 --- a/crates/common/rlp/src/decode.rs +++ b/crates/common/rlp/src/decode.rs @@ -384,6 +384,33 @@ impl Decodable for smol_str::SmolStr { } } +#[cfg(feature = "enr")] +impl Decodable for enr::Enr +where + K: enr::EnrKey, +{ + fn decode(buf: &mut &[u8]) -> Result { + // currently the only way to build an enr is to decode it using the rlp::Decodable trait + ::decode(&rlp::Rlp::new(buf)).map_err(|e| match e { + rlp::DecoderError::RlpIsTooShort => DecodeError::InputTooShort, + rlp::DecoderError::RlpInvalidLength => DecodeError::Overflow, + rlp::DecoderError::RlpExpectedToBeList => DecodeError::UnexpectedString, + rlp::DecoderError::RlpExpectedToBeData => DecodeError::UnexpectedList, + rlp::DecoderError::RlpDataLenWithZeroPrefix | + rlp::DecoderError::RlpListLenWithZeroPrefix => DecodeError::LeadingZero, + rlp::DecoderError::RlpInvalidIndirection => DecodeError::NonCanonicalSize, + rlp::DecoderError::RlpIncorrectListLen => { + DecodeError::Custom("incorrect list length when decoding rlp") + } + rlp::DecoderError::RlpIsTooBig => DecodeError::Custom("rlp is too big"), + rlp::DecoderError::RlpInconsistentLengthAndData => { + DecodeError::Custom("inconsistent length and data when decoding rlp") + } + rlp::DecoderError::Custom(s) => DecodeError::Custom(s), + }) + } +} + #[cfg(test)] mod tests { extern crate alloc; @@ -602,4 +629,62 @@ mod tests { (Err(DecodeError::UnexpectedList), &hex!("C0")[..]), ]) } + + // test vector from the enr library rlp encoding tests + // + #[cfg(feature = "enr")] + #[test] + fn decode_enr_rlp() { + use enr::{secp256k1::SecretKey, Enr, EnrPublicKey}; + use std::net::Ipv4Addr; + + let valid_record = hex!("f884b8407098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b76f2635f4e234738f30813a89eb9137e3e3df5266e3a1f11df72ecf1145ccb9c01826964827634826970847f00000189736563703235366b31a103ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd31388375647082765f"); + let signature = hex!("7098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b76f2635f4e234738f30813a89eb9137e3e3df5266e3a1f11df72ecf1145ccb9c"); + let expected_pubkey = + hex!("03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd3138"); + + let enr = Enr::::decode(&mut &valid_record[..]).unwrap(); + let pubkey = enr.public_key().encode(); + + assert_eq!(enr.ip4(), Some(Ipv4Addr::new(127, 0, 0, 1))); + assert_eq!(enr.id(), Some(String::from("v4"))); + assert_eq!(enr.udp4(), Some(30303)); + assert_eq!(enr.tcp4(), None); + assert_eq!(enr.signature(), &signature[..]); + assert_eq!(pubkey.to_vec(), expected_pubkey); + assert!(enr.verify()); + } + + // test vector from the enr library rlp encoding tests + // + #[cfg(feature = "enr")] + #[test] + fn encode_decode_enr_rlp() { + use enr::{secp256k1::SecretKey, Enr, EnrBuilder, EnrKey, EnrPublicKey}; + use std::net::Ipv4Addr; + + let key = SecretKey::new(&mut rand::rngs::OsRng); + let ip = Ipv4Addr::new(127, 0, 0, 1); + let tcp = 3000; + + let enr = { + let mut builder = EnrBuilder::new("v4"); + builder.ip(ip.into()); + builder.tcp4(tcp); + builder.build(&key).unwrap() + }; + + let mut encoded = BytesMut::new(); + enr.encode(&mut encoded); + + let decoded_enr = Enr::::decode(&mut &encoded[..]).unwrap(); + + assert_eq!(decoded_enr, enr); + + assert_eq!(decoded_enr.id(), Some("v4".into())); + assert_eq!(decoded_enr.ip4(), Some(ip)); + assert_eq!(decoded_enr.tcp4(), Some(tcp)); + assert_eq!(decoded_enr.public_key().encode(), key.public().encode()); + assert!(decoded_enr.verify()); + } } diff --git a/crates/common/rlp/src/encode.rs b/crates/common/rlp/src/encode.rs index 65f9bd098..d78ff4222 100644 --- a/crates/common/rlp/src/encode.rs +++ b/crates/common/rlp/src/encode.rs @@ -185,6 +185,38 @@ impl Encodable for smol_str::SmolStr { } } +#[cfg(feature = "enr")] +impl Encodable for enr::Enr +where + K: enr::EnrKey, +{ + fn encode(&self, out: &mut dyn BufMut) { + let payload_length = self.signature().length() + + self.seq().length() + + self.iter().fold(0, |acc, (k, v)| acc + k.as_slice().length() + v.len()); + + let header = Header { list: true, payload_length }; + header.encode(out); + + self.signature().encode(out); + self.seq().encode(out); + + for (k, v) in self.iter() { + // Keys are byte data + k.as_slice().encode(out); + // Values are raw RLP encoded data + out.put_slice(v); + } + } + + fn length(&self) -> usize { + let payload_length = self.signature().length() + + self.seq().length() + + self.iter().fold(0, |acc, (k, v)| acc + k.as_slice().length() + v.len()); + payload_length + length_of_length(payload_length) + } +} + #[cfg(feature = "ethnum")] mod ethnum_support { use super::*; @@ -577,4 +609,37 @@ mod tests { "abcdefgh".to_string().encode(&mut b); assert_eq!(&encoded(SmolStr::new("abcdefgh"))[..], b.as_ref()); } + + // test vector from the enr library rlp encoding tests + // + #[cfg(feature = "enr")] + #[test] + fn encode_known_rlp_enr() { + use crate::Decodable; + use enr::{secp256k1::SecretKey, Enr, EnrPublicKey}; + use std::net::Ipv4Addr; + + let valid_record = hex!("f884b8407098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b76f2635f4e234738f30813a89eb9137e3e3df5266e3a1f11df72ecf1145ccb9c01826964827634826970847f00000189736563703235366b31a103ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd31388375647082765f"); + let signature = hex!("7098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b76f2635f4e234738f30813a89eb9137e3e3df5266e3a1f11df72ecf1145ccb9c"); + let expected_pubkey = + hex!("03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd3138"); + + let enr = Enr::::decode(&mut &valid_record[..]).unwrap(); + let pubkey = enr.public_key().encode(); + + assert_eq!(enr.ip4(), Some(Ipv4Addr::new(127, 0, 0, 1))); + assert_eq!(enr.id(), Some(String::from("v4"))); + assert_eq!(enr.udp4(), Some(30303)); + assert_eq!(enr.tcp4(), None); + assert_eq!(enr.signature(), &signature[..]); + assert_eq!(pubkey.to_vec(), expected_pubkey); + assert!(enr.verify()); + + let mut encoded = BytesMut::new(); + enr.encode(&mut encoded); + assert_eq!(&encoded[..], &valid_record[..]); + + // ensure the length is equal + assert_eq!(enr.length(), valid_record.len()); + } } diff --git a/crates/net/discv4/Cargo.toml b/crates/net/discv4/Cargo.toml index ba25827ac..d06347960 100644 --- a/crates/net/discv4/Cargo.toml +++ b/crates/net/discv4/Cargo.toml @@ -12,7 +12,7 @@ Ethereum network support [dependencies] # reth reth-primitives = { path = "../../primitives" } -reth-rlp = { path = "../../common/rlp" } +reth-rlp = { path = "../../common/rlp", features = ["enr"] } reth-rlp-derive = { path = "../../common/rlp-derive" } reth-net-common = { path = "../common" } @@ -23,15 +23,16 @@ secp256k1 = { version = "0.24", features = [ "rand-std", "recovery", ] } +enr = { version = "0.7.0", default-features = false, features = ["rust-secp256k1"] } # async/futures tokio = { version = "1", features = ["io-util", "net", "time"] } tokio-stream = "0.1" # misc +bytes = "1.2" generic-array = "0.14" tracing = "0.1" -bytes = "1.2" thiserror = "1.0" url = "2.3" hex = "0.4" diff --git a/crates/net/discv4/src/config.rs b/crates/net/discv4/src/config.rs index 30a0e3957..f6fdec954 100644 --- a/crates/net/discv4/src/config.rs +++ b/crates/net/discv4/src/config.rs @@ -1,10 +1,16 @@ +//! A set of configuration parameters to tune the discovery protocol. +//! +//! This basis of this file has been taken from the discv5 codebase: +//! https://github.com/sigp/discv5 + use crate::node::NodeRecord; +use bytes::{Bytes, BytesMut}; use reth_net_common::ban_list::BanList; -///! A set of configuration parameters to tune the discovery protocol. -// This basis of this file has been taken from the discv5 codebase: -// https://github.com/sigp/discv5 -use std::collections::HashSet; -use std::time::Duration; +use reth_rlp::Encodable; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; /// Configuration parameters that define the performance of the discovery network. #[derive(Clone, Debug)] @@ -21,7 +27,9 @@ pub struct Discv4Config { /// The rate at which lookups should be triggered. pub lookup_interval: Duration, /// The duration of we consider a FindNode request timed out. - pub find_node_timeout: Duration, + pub request_timeout: Duration, + /// The duration after which we consider an enr request timed out. + pub enr_timeout: Duration, /// The duration we set for neighbours responses pub neighbours_timeout: Duration, /// Provides a way to ban peers and ips. @@ -38,6 +46,10 @@ pub struct Discv4Config { pub enable_dht_random_walk: bool, /// Whether to automatically lookup peers. pub enable_lookup: bool, + /// Whether to enable EIP-868 extension + pub enable_eip868: bool, + /// Additional pairs to include in The [`Enr`](enr::Enr) if EIP-868 extension is enabled + pub additional_eip868_rlp_pairs: HashMap, Bytes>, } impl Discv4Config { @@ -45,6 +57,30 @@ impl Discv4Config { pub fn builder() -> Discv4ConfigBuilder { Default::default() } + + /// Add another key value pair to include in the ENR + pub fn add_eip868_pair(&mut self, key: impl AsRef<[u8]>, value: impl Encodable) -> &mut Self { + let mut buf = BytesMut::new(); + value.encode(&mut buf); + self.add_eip868_rlp_pair(key, buf.freeze()) + } + + /// Add another key value pair to include in the ENR + pub fn add_eip868_rlp_pair(&mut self, key: impl AsRef<[u8]>, rlp: Bytes) -> &mut Self { + self.additional_eip868_rlp_pairs.insert(key.as_ref().to_vec(), rlp); + self + } + + /// Extend additional key value pairs to include in the ENR + pub fn extend_eip868_rlp_pairs( + &mut self, + pairs: impl IntoIterator, Bytes)>, + ) -> &mut Self { + for (k, v) in pairs.into_iter() { + self.add_eip868_rlp_pair(k, v); + } + self + } } impl Default for Discv4Config { @@ -54,14 +90,17 @@ impl Default for Discv4Config { request_retries: 1, ping_interval: Duration::from_secs(300), ping_timeout: Duration::from_secs(5), - find_node_timeout: Duration::from_secs(2), - neighbours_timeout: Duration::from_secs(30), + request_timeout: Duration::from_secs(20), + enr_timeout: Duration::from_secs(5), + neighbours_timeout: Duration::from_secs(5), lookup_interval: Duration::from_secs(20), ban_list: Default::default(), ban_duration: Some(Duration::from_secs(3600)), // 1 hour bootstrap_nodes: Default::default(), enable_dht_random_walk: true, enable_lookup: true, + enable_eip868: true, + additional_eip868_rlp_pairs: Default::default(), } } } @@ -97,6 +136,12 @@ impl Discv4ConfigBuilder { self } + /// Sets the timeout for enr requests + pub fn enr_request_timeout(&mut self, duration: Duration) -> &mut Self { + self.config.enr_timeout = duration; + self + } + /// Whether to discover random nodes in the DHT. pub fn enable_dht_random_walk(&mut self, enable_dht_random_walk: bool) -> &mut Self { self.config.enable_dht_random_walk = enable_dht_random_walk; @@ -109,6 +154,36 @@ impl Discv4ConfigBuilder { self } + /// Whether to enable EIP-868 + pub fn enable_eip868(&mut self, enable_eip868: bool) -> &mut Self { + self.config.enable_eip868 = enable_eip868; + self + } + + /// Add another key value pair to include in the ENR + pub fn add_eip868_pair(&mut self, key: impl AsRef<[u8]>, value: impl Encodable) -> &mut Self { + let mut buf = BytesMut::new(); + value.encode(&mut buf); + self.add_eip868_rlp_pair(key, buf.freeze()) + } + + /// Add another key value pair to include in the ENR + pub fn add_eip868_rlp_pair(&mut self, key: impl AsRef<[u8]>, rlp: Bytes) -> &mut Self { + self.config.additional_eip868_rlp_pairs.insert(key.as_ref().to_vec(), rlp); + self + } + + /// Extend additional key value pairs to include in the ENR + pub fn extend_eip868_rlp_pairs( + &mut self, + pairs: impl IntoIterator, Bytes)>, + ) -> &mut Self { + for (k, v) in pairs.into_iter() { + self.add_eip868_rlp_pair(k, v); + } + self + } + /// A set of lists that can ban IP's or PeerIds from the server. See /// [`BanList`]. pub fn ban_list(&mut self, ban_list: BanList) -> &mut Self { diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index c7fdd8cea..73e3c66ef 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -22,15 +22,18 @@ use crate::{ node::{kad_key, NodeKey}, proto::{FindNode, Message, Neighbours, Packet, Ping, Pong}, }; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use discv5::{ + kbucket, kbucket::{ Distance, Entry as BucketEntry, FailureReason, InsertResult, KBucketsTable, NodeStatus, MAX_NODES_PER_BUCKET, }, ConnectionDirection, ConnectionState, }; -use reth_primitives::{PeerId, H256}; +use enr::{Enr, EnrBuilder}; +use proto::{EnrRequest, EnrResponse}; +use reth_primitives::{ForkId, PeerId, H256}; use secp256k1::SecretKey; use std::{ cell::RefCell, @@ -169,19 +172,18 @@ impl Discv4 { /// ``` pub async fn bind( local_address: SocketAddr, - mut local_enr: NodeRecord, + mut local_node_record: NodeRecord, secret_key: SecretKey, config: Discv4Config, ) -> io::Result<(Self, Discv4Service)> { let socket = UdpSocket::bind(local_address).await?; let local_addr = socket.local_addr()?; - local_enr.udp_port = local_addr.port(); + local_node_record.udp_port = local_addr.port(); trace!( target : "discv4", ?local_addr,"opened UDP socket"); - // We don't expect many commands, so the buffer can be quite small here. - let (to_service, rx) = mpsc::channel(5); + let (to_service, rx) = mpsc::channel(100); let service = - Discv4Service::new(socket, local_addr, local_enr, secret_key, config, Some(rx)); + Discv4Service::new(socket, local_addr, local_node_record, secret_key, config, Some(rx)); let discv4 = Discv4 { local_addr, to_service }; Ok((discv4, service)) } @@ -230,8 +232,7 @@ impl Discv4 { /// Removes the peer from the table, if it exists. pub fn remove_peer(&self, node_id: PeerId) { let cmd = Discv4Command::Remove(node_id); - // we want this message to arrive, so we clone the sender - let _ = self.to_service.clone().try_send(cmd); + self.safe_send_to_service(cmd); } /// Adds the peer and id to the ban list. @@ -239,16 +240,14 @@ impl Discv4 { /// This will prevent any future inclusion in the table pub fn ban(&self, node_id: PeerId, ip: IpAddr) { let cmd = Discv4Command::Ban(node_id, ip); - // we want this message to arrive, so we clone the sender - let _ = self.to_service.clone().try_send(cmd); + self.safe_send_to_service(cmd); } /// Adds the ip to the ban list. /// /// This will prevent any future inclusion in the table pub fn ban_ip(&self, ip: IpAddr) { let cmd = Discv4Command::BanIp(ip); - // we want this message to arrive, so we clone the sender - let _ = self.to_service.clone().try_send(cmd); + self.safe_send_to_service(cmd); } /// Adds the peer to the ban list. @@ -256,7 +255,38 @@ impl Discv4 { /// This will prevent any future inclusion in the table pub fn ban_node(&self, node_id: PeerId) { let cmd = Discv4Command::BanPeer(node_id); - // we want this message to arrive, so we clone the sender + self.safe_send_to_service(cmd); + } + + /// Sets the tcp port + /// + /// This will update our [`NodeRecord`]'s tcp port. + pub fn set_tcp_port(&self, port: u16) { + let cmd = Discv4Command::SetTcpPort(port); + self.safe_send_to_service(cmd); + } + + /// Sets the pair in the EIP-868 [`Enr`] of the node. + /// + /// If the key already exists, this will update it. + /// + /// CAUTION: The value **must** be rlp encoded + pub fn set_eip868_rlp_pair(&self, key: Vec, rlp: Bytes) { + let cmd = Discv4Command::SetEIP868RLPPair { key, rlp }; + self.safe_send_to_service(cmd); + } + + /// Sets the pair in the EIP-868 [`Enr`] of the node. + /// + /// If the key already exists, this will update it. + pub fn set_eip868_rlp(&self, key: Vec, value: impl reth_rlp::Encodable) { + let mut buf = BytesMut::new(); + value.encode(&mut buf); + self.set_eip868_rlp_pair(key, buf.freeze()) + } + + fn safe_send_to_service(&self, cmd: Discv4Command) { + // we want this message to always arrive, so we clone the sender let _ = self.to_service.clone().try_send(cmd); } @@ -284,8 +314,10 @@ impl Discv4 { pub struct Discv4Service { /// Local address of the UDP socket. local_address: SocketAddr, + /// The local ENR for EIP-868 + local_eip_868_enr: Enr, /// Local ENR of the server. - local_enr: NodeRecord, + local_node_record: NodeRecord, /// The secret key used to sign payloads secret_key: SecretKey, /// The UDP socket for sending and receiving messages. @@ -313,6 +345,8 @@ pub struct Discv4Service { pending_pings: HashMap, /// Currently active FindNode requests pending_find_nodes: HashMap, + /// Currently active ENR requests + pending_enr_requests: HashMap, /// Commands listener commands_rx: Option>, /// All subscribers for table updates @@ -334,7 +368,7 @@ impl Discv4Service { pub(crate) fn new( socket: UdpSocket, local_address: SocketAddr, - local_enr: NodeRecord, + local_node_record: NodeRecord, secret_key: SecretKey, config: Discv4Config, commands_rx: Option>, @@ -349,13 +383,13 @@ impl Discv4Service { let mut tasks = JoinSet::<()>::new(); let udp = Arc::clone(&socket); - tasks.spawn(async move { receive_loop(udp, ingress_tx, local_enr.id).await }); + tasks.spawn(async move { receive_loop(udp, ingress_tx, local_node_record.id).await }); let udp = Arc::clone(&socket); tasks.spawn(async move { send_loop(udp, egress_rx).await }); let kbuckets = KBucketsTable::new( - local_enr.key(), + local_node_record.key(), Duration::from_secs(60), MAX_NODES_PER_BUCKET, None, @@ -364,9 +398,17 @@ impl Discv4Service { let self_lookup_interval = tokio::time::interval(config.lookup_interval); - let ping_interval = tokio::time::interval(config.ping_interval); + // Wait `ping_interval` and then start pinging every `ping_interval` because we want to wait + // for + let ping_interval = tokio::time::interval_at( + tokio::time::Instant::now() + config.ping_interval, + config.ping_interval, + ); - let evict_expired_requests_interval = tokio::time::interval(config.find_node_timeout); + let evict_expired_requests_interval = tokio::time::interval_at( + tokio::time::Instant::now() + config.request_timeout, + config.request_timeout, + ); let lookup_rotator = if config.enable_dht_random_walk { LookupTargetRotator::default() @@ -374,9 +416,29 @@ impl Discv4Service { LookupTargetRotator::local_only() }; + // for EIP-868 construct an ENR + let local_eip_868_enr = { + let mut builder = EnrBuilder::new("v4"); + builder.ip(local_node_record.address); + if local_node_record.address.is_ipv4() { + builder.udp4(local_node_record.udp_port); + builder.tcp4(local_node_record.tcp_port); + } else { + builder.udp6(local_node_record.udp_port); + builder.tcp6(local_node_record.tcp_port); + } + + for (key, val) in config.additional_eip868_rlp_pairs.iter() { + builder.add_value_rlp(key, val.clone()); + } + + builder.build(&secret_key).expect("v4 is set; qed") + }; + Discv4Service { local_address, - local_enr, + local_eip_868_enr, + local_node_record, _socket: socket, kbuckets, secret_key, @@ -386,6 +448,7 @@ impl Discv4Service { queued_pings: Default::default(), pending_pings: Default::default(), pending_find_nodes: Default::default(), + pending_enr_requests: Default::default(), check_timestamps: false, commands_rx, update_listeners: Vec::with_capacity(1), @@ -397,6 +460,15 @@ impl Discv4Service { } } + /// Returns the current enr sequence + fn enr_seq(&self) -> Option { + if self.config.enable_eip868 { + Some(self.local_eip_868_enr.seq()) + } else { + None + } + } + /// Returns the address of the UDP socket pub fn local_addr(&self) -> SocketAddr { self.local_address @@ -404,13 +476,13 @@ impl Discv4Service { /// Returns the ENR of this service. pub fn local_enr(&self) -> NodeRecord { - self.local_enr + self.local_node_record } /// Returns mutable reference to ENR for testing. #[cfg(test)] pub fn local_enr_mut(&mut self) -> &mut NodeRecord { - &mut self.local_enr + &mut self.local_node_record } /// Returns true if the given PeerId is currently in the bucket @@ -437,7 +509,8 @@ impl Discv4Service { for record in self.config.bootstrap_nodes.clone() { debug!(target : "discv4", ?record, "Adding bootstrap node"); let key = kad_key(record.id); - let entry = NodeEntry { record, last_seen: Instant::now() }; + let entry = + NodeEntry { record, last_seen: Instant::now(), last_enr_seq: None, fork_id: None }; // insert the boot node in the table let _ = self.kbuckets.insert_or_update( @@ -448,8 +521,7 @@ impl Discv4Service { direction: ConnectionDirection::Outgoing, }, ); - - self.try_ping(record, PingReason::Normal); + self.try_ping(record, PingReason::Initial); } } @@ -474,7 +546,7 @@ impl Discv4Service { /// Looks up the local node in the DHT. pub fn lookup_self(&mut self) { - self.lookup(self.local_enr.id) + self.lookup(self.local_node_record.id) } /// Looks up the given node in the DHT @@ -578,41 +650,128 @@ impl Discv4Service { let key = kad_key(node_id); let removed = self.kbuckets.remove(&key); if removed { + debug!(target: "discv4", ?node_id, "removed node"); self.notify(DiscoveryUpdate::Removed(node_id)); } removed } - /// Updates the node entry - fn update_node(&mut self, record: NodeRecord) { - if record.id == self.local_enr.id { + /// Update the entry on RE-ping + /// + /// On re-ping we check for a changed enr_seq if eip868 is enabled and when it changed we sent a + /// followup request to retrieve the updated ENR + fn update_on_reping(&mut self, record: NodeRecord, last_enr_seq: Option) { + if record.id == self.local_node_record.id { return } + + let last_enr_seq = if let Some(last_enr_seq) = last_enr_seq { + last_enr_seq + } else { + // no need to check for increased enr seq + let _ = self.insert_or_update(record, None); + return + }; + let key = kad_key(record.id); - let entry = NodeEntry { record, last_seen: Instant::now() }; - match self.kbuckets.insert_or_update( + + let mut fork_id = None; + let mut unchanged = false; + if let kbucket::Entry::Present(entry, _) = self.kbuckets.entry(&key) { + let value = entry.value(); + // we want to keep the forkid if it was set previously and only update the timestamp + if let Some(current_enr) = value.last_enr_seq { + if last_enr_seq == current_enr { + unchanged = true; + // ENR sequence is unchanged, so we can keep the fork id + fork_id = value.fork_id; + } + } + } + + // update the value but keep the existing forkid + if unchanged || fork_id.is_some() { + let entry = NodeEntry { + record, + last_seen: Instant::now(), + last_enr_seq: Some(last_enr_seq), + fork_id, + }; + let _ = self.kbuckets.insert_or_update( + &key, + entry, + NodeStatus { + state: ConnectionState::Connected, + direction: ConnectionDirection::Outgoing, + }, + ); + } else { + // update the value and re-request the ENR + let _ = self.insert_or_update(record, Some(last_enr_seq)); + } + } + + /// Updates the node entry + /// + /// Inserts the given node into the table. + /// + /// If EIP-868 is enabled this will send a followup request to ask for the ENR of the node, if + /// the given enr seq is `Some`. + fn insert_or_update( + &mut self, + record: NodeRecord, + mut last_enr_seq: Option, + ) -> InsertResult { + if record.id == self.local_node_record.id { + return InsertResult::Failed(FailureReason::InvalidSelfUpdate) + } + + // If EIP868 extension is disabled then we want to ignore this + if !self.config.enable_eip868 { + last_enr_seq = None; + } + + // if the peer included a enr seq in the pong then we can try to request the ENR of that + // node + let has_enr_seq = last_enr_seq.is_some(); + + let key = kad_key(record.id); + let entry = NodeEntry { record, last_seen: Instant::now(), last_enr_seq, fork_id: None }; + let res = self.kbuckets.insert_or_update( &key, entry, NodeStatus { state: ConnectionState::Connected, direction: ConnectionDirection::Outgoing, }, - ) { + ); + + match &res { InsertResult::Inserted => { debug!(target : "discv4",?record, "inserted new record to table"); self.notify(DiscoveryUpdate::Added(record)); + + if has_enr_seq { + // request the ENR of the node + self.send_enr_request(record); + } } - InsertResult::ValueUpdated { .. } | InsertResult::Updated { .. } => { + InsertResult::ValueUpdated => { trace!(target : "discv4",?record, "updated record"); + if has_enr_seq { + // request the ENR of the node + self.send_enr_request(record); + } } InsertResult::Failed(FailureReason::BucketFull) => { - debug!(target : "discv4", ?record, "discovered new record but bucket is full"); - self.notify(DiscoveryUpdate::Discovered(record)); + trace!(target : "discv4", ?record, "discovered new record but bucket is full"); } res => { warn!(target : "discv4",?record, ?res, "failed to insert"); } - } + }; + + res } /// Adds all nodes @@ -627,13 +786,8 @@ impl Discv4Service { /// If the node's not in the table yet, this will start a ping to get it added on ping. pub fn add_node(&mut self, record: NodeRecord) { let key = kad_key(record.id); - - #[allow(clippy::single_match)] - match self.kbuckets.entry(&key) { - BucketEntry::Absent(_) => self.try_ping(record, PingReason::Normal), - _ => { - // is already in the table - } + if let BucketEntry::Absent(_) = self.kbuckets.entry(&key) { + self.try_ping(record, PingReason::Initial) } } @@ -664,7 +818,12 @@ impl Discv4Service { self.add_node(record); // send the pong - let msg = Message::Pong(Pong { to: ping.from, echo: hash, expire: ping.expire }); + let msg = Message::Pong(Pong { + to: ping.from, + echo: hash, + expire: ping.expire, + enr_sq: self.enr_seq(), + }); self.send_packet(msg, remote_addr); } @@ -693,8 +852,12 @@ impl Discv4Service { pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> H256 { let remote_addr = node.udp_addr(); let id = node.id; - let ping = - Ping { from: self.local_enr.into(), to: node.into(), expire: self.ping_timeout() }; + let ping = Ping { + from: self.local_node_record.into(), + to: node.into(), + expire: self.ping_timeout(), + enr_sq: self.enr_seq(), + }; trace!(target : "discv4", ?ping, "sending ping"); let echo_hash = self.send_packet(Message::Ping(ping), remote_addr); @@ -703,6 +866,23 @@ impl Discv4Service { echo_hash } + /// Sends an enr request message to the node's UDP address. + /// + /// Returns the echo hash of the ping message. + pub(crate) fn send_enr_request(&mut self, node: NodeRecord) { + if !self.config.enable_eip868 { + return + } + let remote_addr = node.udp_addr(); + let enr_request = EnrRequest { expire: self.enr_request_timeout() }; + + trace!(target : "discv4", ?enr_request, "sending enr request"); + let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr); + + self.pending_enr_requests + .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash }); + } + /// Message handler for an incoming `Pong`. fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) { if self.is_expired(pong.expire) { @@ -724,15 +904,18 @@ impl Discv4Service { }; match reason { - PingReason::Normal => { - self.update_node(node); + PingReason::Initial => { + let _ = self.insert_or_update(node, pong.enr_sq); + } + PingReason::RePing => { + self.update_on_reping(node, pong.enr_sq); } PingReason::FindNode(target, status) => { // update the status of the node match status { NodeEntryStatus::Expired | NodeEntryStatus::Valid => { // update node in the table - self.update_node(node) + let _ = self.insert_or_update(node, pong.enr_sq); } NodeEntryStatus::IsLocal | NodeEntryStatus::Unknown => {} } @@ -740,7 +923,7 @@ impl Discv4Service { self.respond_closest(target, remote_addr); } PingReason::Lookup(node, ctx) => { - self.update_node(node); + let _ = self.insert_or_update(node, pong.enr_sq); self.find_node(&node, ctx); } } @@ -766,6 +949,70 @@ impl Discv4Service { } } + /// Handler for incoming `EnrResponse` message + fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) { + trace!(target : "discv4", ?remote_addr, ?msg, "received ENR response"); + if let Some(resp) = self.pending_enr_requests.remove(&id) { + if resp.echo_hash == msg.request_hash { + let key = kad_key(id); + if let Some((record, existing_fork_id)) = + self.kbuckets.get_bucket(&key).and_then(|bucket| { + bucket.get(&key).map(|entry| (entry.value.record, entry.value.fork_id)) + }) + { + let fork_id = msg.eth_fork_id(); + let entry = NodeEntry { + record, + last_seen: Instant::now(), + last_enr_seq: Some(msg.enr.seq()), + fork_id, + }; + let _ = self.kbuckets.insert_or_update( + &key, + entry, + NodeStatus { + state: ConnectionState::Connected, + direction: ConnectionDirection::Outgoing, + }, + ); + + if fork_id == existing_fork_id { + // nothing to update + return + } + + if let Some(fork_id) = fork_id { + self.notify(DiscoveryUpdate::EnrForkId(record, fork_id)) + } + } + } + } + } + + /// Handler for incoming `EnrRequest` message + fn on_enr_request( + &mut self, + msg: EnrRequest, + remote_addr: SocketAddr, + id: PeerId, + request_hash: H256, + ) { + if !self.config.enable_eip868 || self.is_expired(msg.expire) { + return + } + + let key = kad_key(id); + if let BucketEntry::Present(_, _) = self.kbuckets.entry(&key) { + self.send_packet( + Message::EnrResponse(EnrResponse { + request_hash, + enr: self.local_eip_868_enr.clone(), + }), + remote_addr, + ); + } + } + /// Handler for incoming `Neighbours` messages that are handled if they're responses to /// `FindNode` requests fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) { @@ -800,7 +1047,7 @@ impl Discv4Service { } }; - let our_key = kad_key(self.local_enr.id); + let our_key = kad_key(self.local_node_record.id); // This is the recursive lookup step where we initiate new FindNode requests for new nodes // that where discovered. @@ -846,7 +1093,7 @@ impl Discv4Service { /// Returns the current status of the node fn node_status(&mut self, node: PeerId, addr: SocketAddr) -> NodeEntryStatus { - if node == self.local_enr.id { + if node == self.local_node_record.id { debug!( target : "discv4", ?node,"Got an incoming discovery request from self"); return NodeEntryStatus::IsLocal } @@ -873,6 +1120,10 @@ impl Discv4Service { } fn evict_expired_requests(&mut self, now: Instant) { + self.pending_enr_requests.retain(|_node_id, enr_request| { + now.duration_since(enr_request.sent_at) < self.config.ping_timeout + }); + let mut nodes_to_expire = Vec::new(); self.pending_pings.retain(|node_id, ping_request| { if now.duration_since(ping_request.sent_at) > self.config.ping_timeout { @@ -882,7 +1133,7 @@ impl Discv4Service { true }); self.pending_find_nodes.retain(|node_id, find_node_request| { - if now.duration_since(find_node_request.sent_at) > self.config.find_node_timeout { + if now.duration_since(find_node_request.sent_at) > self.config.request_timeout { if !find_node_request.answered { nodes_to_expire.push(*node_id); } @@ -890,8 +1141,11 @@ impl Discv4Service { } true }); + + debug!(target: "discv4", num=%nodes_to_expire.len(), "evicting nodes"); + for node_id in nodes_to_expire { - self.expire_node_request(node_id); + self.remove_node(node_id); } } @@ -901,16 +1155,10 @@ impl Discv4Service { nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen)); let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::>(); for node in to_ping { - self.try_ping(node, PingReason::Normal) + self.try_ping(node, PingReason::RePing) } } - /// Removes the node from the table - fn expire_node_request(&mut self, node_id: PeerId) { - let key = kad_key(node_id); - self.kbuckets.remove(&key); - } - /// Returns true if the expiration timestamp is considered invalid. fn is_expired(&self, expiration: u64) -> bool { self.ensure_timestamp(expiration).is_err() @@ -941,10 +1189,14 @@ impl Discv4Service { } fn find_node_timeout(&self) -> u64 { - (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.find_node_timeout) + (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout) .as_secs() } + fn enr_request_timeout(&self) -> u64 { + (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_timeout).as_secs() + } + fn send_neighbours_timeout(&self) -> u64 { (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_timeout) .as_secs() @@ -958,7 +1210,7 @@ impl Discv4Service { pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { // trigger self lookup if self.config.enable_lookup && self.lookup_interval.poll_tick(cx).is_ready() { - let target = self.lookup_rotator.next(&self.local_enr.id); + let target = self.lookup_rotator.next(&self.local_node_record.id); self.lookup_with(target, None); } @@ -979,7 +1231,7 @@ impl Discv4Service { if let Some(cmd) = cmd { match cmd { Discv4Command::Lookup { node_id, tx } => { - let node_id = node_id.unwrap_or(self.local_enr.id); + let node_id = node_id.unwrap_or(self.local_node_record.id); self.lookup_with(node_id, tx); } Discv4Command::Updates(tx) => { @@ -997,6 +1249,21 @@ impl Discv4Service { Discv4Command::BanIp(ip) => { self.ban_ip(ip); } + Discv4Command::SetEIP868RLPPair { key, rlp } => { + debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair"); + + let _ = + self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key); + } + Discv4Command::SetTcpPort(port) => { + debug!(target: "discv4", %port, "Update tcp port"); + self.local_node_record.tcp_port = port; + if self.local_node_record.address.is_ipv4() { + let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key); + } else { + let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key); + } + } } } else { is_done = true; @@ -1034,6 +1301,14 @@ impl Discv4Service { self.on_neighbours(msg, remote_addr, node_id); Poll::Ready(Discv4Event::Neighbours) } + Message::EnrRequest(msg) => { + self.on_enr_request(msg, remote_addr, node_id, hash); + Poll::Ready(Discv4Event::EnrRequest) + } + Message::EnrResponse(msg) => { + self.on_enr_response(msg, remote_addr, node_id); + Poll::Ready(Discv4Event::EnrResponse) + } } } } @@ -1068,6 +1343,10 @@ pub enum Discv4Event { FindNode, /// A `Neighbours` message was handled. Neighbours, + /// A `EnrRequest` message was handled. + EnrRequest, + /// A `EnrResponse` message was handled. + EnrResponse, } /// Continuously reads new messages from the channel and writes them to the socket @@ -1128,6 +1407,8 @@ pub(crate) async fn receive_loop(udp: Arc, tx: IngressSender, local_i /// The commands sent from the frontend to the service enum Discv4Command { + SetTcpPort(u16), + SetEIP868RLPPair { key: Vec, rlp: Bytes }, Ban(PeerId, IpAddr), BanPeer(PeerId), BanIp(IpAddr), @@ -1332,6 +1613,13 @@ impl FindNodeRequest { } } +struct EnrRequestState { + // Timestamp when the request was sent. + sent_at: Instant, + // Hash sent in the Ping request + echo_hash: H256, +} + /// Stored node info. #[derive(Debug, Clone, Eq, PartialEq)] struct NodeEntry { @@ -1339,6 +1627,10 @@ struct NodeEntry { record: NodeRecord, /// Timestamp of last pong. last_seen: Instant, + /// Last enr seq we retrieved via a ENR request. + last_enr_seq: Option, + /// ForkId if retrieved via ENR requests. + fork_id: Option, } /// The status ofs a specific node @@ -1365,8 +1657,10 @@ impl NodeEntry { /// Represents why a ping is issued enum PingReason { - /// Standard ping - Normal, + /// Initial ping to a previously unknown peer. + Initial, + /// Re-ping a peer.. + RePing, /// Ping issued to adhere to endpoint proof procedure /// /// Once the expected PONG is received, the endpoint proof is complete and the find node can be @@ -1379,10 +1673,10 @@ enum PingReason { /// Represents node related updates state changes in the underlying node table #[derive(Debug, Clone)] pub enum DiscoveryUpdate { + /// Received a [`ForkId`] via EIP-868 for the given [`NodeRecord`]. + EnrForkId(NodeRecord, ForkId), /// A new node was discovered _and_ added to the table. Added(NodeRecord), - /// A new node was discovered but _not_ added to the table because the bucket is full. - Discovered(NodeRecord), /// Node that was removed from the table Removed(PeerId), /// A series of updates @@ -1396,6 +1690,7 @@ mod tests { bootnodes::mainnet_nodes, mock::{create_discv4, create_discv4_with_config}, }; + use reth_primitives::{hex_literal::hex, ForkHash}; #[test] fn test_local_rotator() { @@ -1434,9 +1729,14 @@ mod tests { #[ignore] async fn test_lookup() { reth_tracing::init_tracing(); + let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 }; let all_nodes = mainnet_nodes(); - let config = Discv4Config::builder().add_boot_nodes(all_nodes).build(); + let config = Discv4Config::builder() + .add_boot_nodes(all_nodes) + .lookup_interval(Duration::from_secs(1)) + .add_eip868_pair("eth", fork_id) + .build(); let (_discv4, mut service) = create_discv4_with_config(config).await; let mut updates = service.update_stream(); @@ -1446,6 +1746,9 @@ mod tests { let mut table = HashMap::new(); while let Some(update) = updates.next().await { match update { + DiscoveryUpdate::EnrForkId(record, fork_id) => { + println!("{record:?}, {fork_id:?}"); + } DiscoveryUpdate::Added(record) => { table.insert(record.id, record); } diff --git a/crates/net/discv4/src/mock.rs b/crates/net/discv4/src/mock.rs index a908c5ea1..b4d0ff9cd 100644 --- a/crates/net/discv4/src/mock.rs +++ b/crates/net/discv4/src/mock.rs @@ -9,7 +9,7 @@ use crate::{ IngressReceiver, PeerId, SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS, }; use rand::{thread_rng, Rng, RngCore}; -use reth_primitives::H256; +use reth_primitives::{hex_literal::hex, ForkHash, ForkId, H256}; use secp256k1::{SecretKey, SECP256K1}; use std::{ collections::{HashMap, HashSet}, @@ -154,7 +154,12 @@ impl Stream for MockDiscovery { IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => match msg { Message::Ping(ping) => { if this.pending_pongs.remove(&node_id) { - let pong = Pong { to: ping.from, echo: hash, expire: ping.expire }; + let pong = Pong { + to: ping.from, + echo: hash, + expire: ping.expire, + enr_sq: None, + }; let msg = Message::Pong(pong.clone()); this.send_packet(msg, remote_addr); return Poll::Ready(Some(MockEvent::Pong { @@ -179,6 +184,7 @@ impl Stream for MockDiscovery { } } Message::Neighbours(_) => {} + Message::EnrRequest(_) | Message::EnrResponse(_) => todo!(), }, } } @@ -201,7 +207,8 @@ pub enum MockCommand { /// Creates a new testing instance for [`Discv4`] and its service pub async fn create_discv4() -> (Discv4, Discv4Service) { - create_discv4_with_config(Default::default()).await + let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 }; + create_discv4_with_config(Discv4Config::builder().add_eip868_pair("eth", fork_id).build()).await } /// Creates a new testing instance for [`Discv4`] and its service with the given config. @@ -254,8 +261,14 @@ pub fn rng_message(rng: &mut impl RngCore) -> Message { from: rng_endpoint(rng), to: rng_endpoint(rng), expire: rng.gen(), + enr_sq: None, + }), + 2 => Message::Pong(Pong { + to: rng_endpoint(rng), + echo: H256::random(), + expire: rng.gen(), + enr_sq: None, }), - 2 => Message::Pong(Pong { to: rng_endpoint(rng), echo: H256::random(), expire: rng.gen() }), 3 => Message::FindNode(FindNode { id: PeerId::random(), expire: rng.gen() }), 4 => { let num: usize = rng.gen_range(1..=SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS); @@ -272,6 +285,7 @@ pub fn rng_message(rng: &mut impl RngCore) -> Message { mod tests { use super::*; use crate::{Discv4Event, PingReason}; + use reth_primitives::{hex_literal::hex, ForkHash, ForkId}; use std::net::{IpAddr, Ipv4Addr}; /// This test creates two local UDP sockets. The mocked discovery service responds to specific @@ -297,7 +311,7 @@ mod tests { mockv4.queue_pong(discv_enr.id); // This sends a ping to the mock service - let echo_hash = service.send_ping(mock_enr, PingReason::Normal); + let echo_hash = service.send_ping(mock_enr, PingReason::Initial); // process the mock pong let event = mockv4.next().await.unwrap(); diff --git a/crates/net/discv4/src/node.rs b/crates/net/discv4/src/node.rs index 117285738..1c12de960 100644 --- a/crates/net/discv4/src/node.rs +++ b/crates/net/discv4/src/node.rs @@ -39,6 +39,9 @@ pub(crate) fn kad_key(node: PeerId) -> discv5::Key { } /// Represents a ENR in discv4. +/// +/// Note: this is only an excerpt of the [ENR](enr::Enr) datastructure which is sent in Neighbours +/// message. #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] pub struct NodeRecord { /// The Address of a node. diff --git a/crates/net/discv4/src/proto.rs b/crates/net/discv4/src/proto.rs index 8a6945aaf..f60d403d6 100644 --- a/crates/net/discv4/src/proto.rs +++ b/crates/net/discv4/src/proto.rs @@ -2,7 +2,8 @@ use crate::{error::DecodePacketError, node::NodeRecord, PeerId, MAX_PACKET_SIZE, MIN_PACKET_SIZE}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use reth_primitives::{keccak256, H256}; +use enr::Enr; +use reth_primitives::{keccak256, ForkId, H256}; use reth_rlp::{Decodable, DecodeError, Encodable, Header}; use reth_rlp_derive::{RlpDecodable, RlpEncodable}; use secp256k1::{ @@ -21,6 +22,8 @@ pub enum MessageId { Pong = 2, FindNode = 3, Neighbours = 4, + EnrRequest = 5, + EnrResponse = 6, } impl MessageId { @@ -31,6 +34,8 @@ impl MessageId { 2 => MessageId::Pong, 3 => MessageId::FindNode, 4 => MessageId::Neighbours, + 5 => MessageId::EnrRequest, + 6 => MessageId::EnrResponse, _ => return Err(msg), }; Ok(msg) @@ -44,6 +49,8 @@ pub enum Message { Pong(Pong), FindNode(FindNode), Neighbours(Neighbours), + EnrRequest(EnrRequest), + EnrResponse(EnrResponse), } // === impl Message === @@ -56,6 +63,8 @@ impl Message { Message::Pong(_) => MessageId::Pong, Message::FindNode(_) => MessageId::FindNode, Message::Neighbours(_) => MessageId::Neighbours, + Message::EnrRequest(_) => MessageId::EnrRequest, + Message::EnrResponse(_) => MessageId::EnrResponse, } } @@ -89,6 +98,14 @@ impl Message { payload.put_u8(4); message.encode(&mut payload); } + Message::EnrRequest(message) => { + payload.put_u8(5); + message.encode(&mut payload); + } + Message::EnrResponse(message) => { + payload.put_u8(6); + message.encode(&mut payload); + } } let signature: RecoverableSignature = SECP256K1.sign_ecdsa_recoverable( @@ -146,6 +163,8 @@ impl Message { MessageId::Pong => Message::Pong(Pong::decode(payload)?), MessageId::FindNode => Message::FindNode(FindNode::decode(payload)?), MessageId::Neighbours => Message::Neighbours(Neighbours::decode(payload)?), + MessageId::EnrRequest => Message::EnrRequest(EnrRequest::decode(payload)?), + MessageId::EnrResponse => Message::EnrResponse(EnrResponse::decode(payload)?), }; Ok(Packet { msg, node_id, hash: header_hash }) @@ -220,7 +239,7 @@ impl From for NodeEndpoint { } } -/// A [FindNode packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#findnode-packet-0x03).). +/// A [FindNode packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#findnode-packet-0x03). #[derive(Clone, Copy, Debug, Eq, PartialEq, RlpEncodable, RlpDecodable)] pub struct FindNode { pub id: PeerId, @@ -234,12 +253,65 @@ pub struct Neighbours { pub expire: u64, } +/// A [ENRRequest packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#enrrequest-packet-0x05). +#[derive(Clone, Copy, Debug, Eq, PartialEq, RlpEncodable, RlpDecodable)] +pub struct EnrRequest { + pub expire: u64, +} + +/// A [ENRResponse packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#enrresponse-packet-0x06). +#[derive(Clone, Debug, Eq, PartialEq, RlpEncodable)] +pub struct EnrResponse { + pub request_hash: H256, + pub enr: Enr, +} + +// === impl EnrResponse === + +impl EnrResponse { + /// Returns the [`ForkId`] if set + /// + /// See also + pub fn eth_fork_id(&self) -> Option { + let mut maybe_fork_id = self.enr.get(b"eth")?; + ForkId::decode(&mut maybe_fork_id).ok() + } +} + +impl Decodable for EnrResponse { + fn decode(buf: &mut &[u8]) -> Result { + let b = &mut &**buf; + let rlp_head = Header::decode(b)?; + if !rlp_head.list { + return Err(DecodeError::UnexpectedString) + } + // let started_len = b.len(); + let this = Self { + request_hash: reth_rlp::Decodable::decode(b)?, + enr: reth_rlp::Decodable::decode(b)?, + }; + // TODO: `Decodable` can be derived once we have native reth_rlp decoding for ENR: + // Skipping the size check here is fine since the `buf` is the UDP datagram + // let consumed = started_len - b.len(); + // if consumed != rlp_head.payload_length { + // return Err(reth_rlp::DecodeError::ListLengthMismatch { + // expected: rlp_head.payload_length, + // got: consumed, + // }) + // } + *buf = *b; + Ok(this) + } +} + /// A [Ping packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01). #[derive(Debug, Clone, Eq, PartialEq)] pub struct Ping { pub from: NodeEndpoint, pub to: NodeEndpoint, pub expire: u64, + /// Optional enr_seq for + pub enr_sq: Option, } impl Encodable for Ping { @@ -251,13 +323,33 @@ impl Encodable for Ping { to: &'a NodeEndpoint, expire: u64, } - V4PingMessage { - version: 4, // version 4 - from: &self.from, - to: &self.to, - expire: self.expire, + + #[derive(RlpEncodable)] + struct V4PingMessageEIP868<'a> { + version: u32, + from: &'a NodeEndpoint, + to: &'a NodeEndpoint, + expire: u64, + enr_seq: u64, + } + if let Some(enr_seq) = self.enr_sq { + V4PingMessageEIP868 { + version: 4, // version 4 + from: &self.from, + to: &self.to, + expire: self.expire, + enr_seq, + } + .encode(out); + } else { + V4PingMessage { + version: 4, // version 4 + from: &self.from, + to: &self.to, + expire: self.expire, + } + .encode(out); } - .encode(out) } } @@ -270,11 +362,18 @@ impl Decodable for Ping { } let started_len = b.len(); let _version = u32::decode(b)?; - let this = Self { + let mut this = Self { from: Decodable::decode(b)?, to: Decodable::decode(b)?, expire: Decodable::decode(b)?, + enr_sq: None, }; + + // only decode the ENR sequence if there's more data in the datagram to decode else skip + if b.has_remaining() { + this.enr_sq = Some(Decodable::decode(b)?); + } + let consumed = started_len - b.len(); if consumed > rlp_head.payload_length { return Err(DecodeError::ListLengthMismatch { @@ -290,12 +389,39 @@ impl Decodable for Ping { } /// A [Pong packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#pong-packet-0x02). -// #[derive(Clone, Debug, Eq, PartialEq, RlpEncodable, RlpDecodable)] -#[derive(Clone, Debug, Eq, PartialEq, RlpEncodable)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct Pong { pub to: NodeEndpoint, pub echo: H256, pub expire: u64, + /// Optional enr_seq for + pub enr_sq: Option, +} + +impl Encodable for Pong { + fn encode(&self, out: &mut dyn BufMut) { + #[derive(RlpEncodable)] + struct PongMessageEIP868<'a> { + to: &'a NodeEndpoint, + echo: &'a H256, + expire: u64, + enr_seq: u64, + } + + #[derive(RlpEncodable)] + struct PongMessage<'a> { + to: &'a NodeEndpoint, + echo: &'a H256, + expire: u64, + } + + if let Some(enr_seq) = self.enr_sq { + PongMessageEIP868 { to: &self.to, echo: &self.echo, expire: self.expire, enr_seq } + .encode(out); + } else { + PongMessage { to: &self.to, echo: &self.echo, expire: self.expire }.encode(out); + } + } } impl Decodable for Pong { @@ -306,11 +432,18 @@ impl Decodable for Pong { return Err(DecodeError::UnexpectedString) } let started_len = b.len(); - let this = Self { + let mut this = Self { to: Decodable::decode(b)?, echo: Decodable::decode(b)?, expire: Decodable::decode(b)?, + enr_sq: None, }; + + // only decode the ENR sequence if there's more data in the datagram to decode else skip + if b.has_remaining() { + this.enr_sq = Some(Decodable::decode(b)?); + } + let consumed = started_len - b.len(); if consumed > rlp_head.payload_length { return Err(DecodeError::ListLengthMismatch { @@ -321,6 +454,7 @@ impl Decodable for Pong { let rem = rlp_head.payload_length - consumed; b.advance(rem); *buf = *b; + Ok(this) } } @@ -439,7 +573,12 @@ mod tests { for _ in 0..100 { let mut ip = [0u8; 16]; rng.fill_bytes(&mut ip); - let msg = Ping { from: rng_endpoint(&mut rng), to: rng_endpoint(&mut rng), expire: 0 }; + let msg = Ping { + from: rng_endpoint(&mut rng), + to: rng_endpoint(&mut rng), + expire: 0, + enr_sq: None, + }; let mut buf = BytesMut::new(); msg.encode(&mut buf); @@ -449,6 +588,69 @@ mod tests { } } + #[test] + fn test_ping_message_with_enr() { + let mut rng = thread_rng(); + for _ in 0..100 { + let mut ip = [0u8; 16]; + rng.fill_bytes(&mut ip); + let msg = Ping { + from: rng_endpoint(&mut rng), + to: rng_endpoint(&mut rng), + expire: 0, + enr_sq: Some(rng.gen()), + }; + + let mut buf = BytesMut::new(); + msg.encode(&mut buf); + + let decoded = Ping::decode(&mut buf.as_ref()).unwrap(); + assert_eq!(msg, decoded); + } + } + + #[test] + fn test_pong_message() { + let mut rng = thread_rng(); + for _ in 0..100 { + let mut ip = [0u8; 16]; + rng.fill_bytes(&mut ip); + let msg = Pong { + to: rng_endpoint(&mut rng), + echo: H256::random(), + expire: rng.gen(), + enr_sq: None, + }; + + let mut buf = BytesMut::new(); + msg.encode(&mut buf); + + let decoded = Pong::decode(&mut buf.as_ref()).unwrap(); + assert_eq!(msg, decoded); + } + } + + #[test] + fn test_pong_message_with_enr() { + let mut rng = thread_rng(); + for _ in 0..100 { + let mut ip = [0u8; 16]; + rng.fill_bytes(&mut ip); + let msg = Pong { + to: rng_endpoint(&mut rng), + echo: H256::random(), + expire: rng.gen(), + enr_sq: Some(rng.gen()), + }; + + let mut buf = BytesMut::new(); + msg.encode(&mut buf); + + let decoded = Pong::decode(&mut buf.as_ref()).unwrap(); + assert_eq!(msg, decoded); + } + } + #[test] fn test_hash_mismatch() { let mut rng = thread_rng(); diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index a6bf7c455..78f5d0f99 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -3,7 +3,7 @@ use crate::error::NetworkError; use futures::StreamExt; use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config, NodeRecord}; -use reth_primitives::PeerId; +use reth_primitives::{ForkId, PeerId}; use secp256k1::SecretKey; use std::{ collections::{hash_map::Entry, HashMap, VecDeque}, @@ -66,6 +66,12 @@ impl Discovery { }) } + /// Updates the `eth:ForkId` field in discv4. + #[allow(unused)] + pub(crate) fn update_fork_id(&self, fork_id: ForkId) { + self.discv4.set_eip868_rlp("eth".as_bytes().to_vec(), fork_id) + } + /// Bans the [`IpAddr`] in the discovery service. pub(crate) fn ban_ip(&self, ip: IpAddr) { self.discv4.ban_ip(ip) @@ -83,7 +89,7 @@ impl Discovery { fn on_discv4_update(&mut self, update: DiscoveryUpdate) { match update { - DiscoveryUpdate::Added(node) | DiscoveryUpdate::Discovered(node) => { + DiscoveryUpdate::Added(node) => { let id = node.id; let addr = node.tcp_addr(); match self.discovered_nodes.entry(id) { @@ -94,6 +100,9 @@ impl Discovery { } } } + DiscoveryUpdate::EnrForkId(node, fork_id) => { + self.queued_events.push_back(DiscoveryEvent::EnrForkId(node.id, fork_id)) + } DiscoveryUpdate::Removed(node) => { self.discovered_nodes.remove(&node); } @@ -128,6 +137,8 @@ impl Discovery { pub enum DiscoveryEvent { /// A new node was discovered Discovered(PeerId, SocketAddr), + /// Retrieved a [`ForkId`] from the peer via ENR request, See + EnrForkId(PeerId, ForkId), } #[cfg(test)] diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 6f0e3a521..124682cb3 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -163,6 +163,8 @@ where // merge configured boot nodes discovery_v4_config.bootstrap_nodes.extend(boot_nodes.clone()); + discovery_v4_config.add_eip868_pair("eth", status.forkid); + let discovery = Discovery::new(discovery_addr, secret_key, discovery_v4_config).await?; // need to retrieve the addr here since provided port could be `0` let local_peer_id = discovery.local_id(); diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 079f89239..01a60a8dc 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -5,7 +5,7 @@ use crate::{ use futures::StreamExt; use reth_eth_wire::{error::EthStreamError, DisconnectReason}; use reth_net_common::ban_list::BanList; -use reth_primitives::PeerId; +use reth_primitives::{ForkId, PeerId}; use std::{ collections::{hash_map::Entry, HashMap, VecDeque}, fmt::Display, @@ -236,6 +236,17 @@ impl PeersManager { self.fill_outbound_slots(); } + /// Called as follow-up for a discovered peer. + /// + /// The [`ForkId`] is retrieved from an ENR record that the peer announces over the discovery + /// protocol + pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) { + if let Some(peer) = self.peers.get_mut(&peer_id) { + trace!(target : "net::peers", ?peer_id, ?fork_id, "set discovered fork id"); + peer.fork_id = Some(fork_id); + } + } + /// Called for a newly discovered peer. /// /// If the peer already exists, then the address will be updated. If the addresses differ, the @@ -274,6 +285,8 @@ impl PeersManager { /// Returns the idle peer with the highest reputation. /// + /// Peers with a `forkId` are considered better than peers without. + /// /// Returns `None` if no peer is available. fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> { let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| peer.state.is_unconnected()); @@ -282,8 +295,18 @@ impl PeersManager { let mut best_peer = unconnected.next()?; for maybe_better in unconnected { - if maybe_better.1.reputation > best_peer.1.reputation { - best_peer = maybe_better; + match (maybe_better.1.fork_id.as_ref(), best_peer.1.fork_id.as_ref()) { + (Some(_), Some(_)) | (None, None) => { + if maybe_better.1.reputation > best_peer.1.reputation { + best_peer = maybe_better; + } + } + (Some(_), None) => { + if !maybe_better.1.is_banned() { + best_peer = maybe_better; + } + } + _ => {} } } Some((*best_peer.0, best_peer.1)) @@ -422,6 +445,8 @@ pub struct Peer { reputation: i32, /// The state of the connection, if any. state: PeerConnectionState, + /// The [`ForkId`] that the peer announced via discovery. + fork_id: Option, } // === impl Peer === @@ -432,7 +457,7 @@ impl Peer { } fn with_state(addr: SocketAddr, state: PeerConnectionState) -> Self { - Self { addr, state, reputation: 0 } + Self { addr, state, reputation: 0, fork_id: None } } /// Returns true if the peer's reputation is below the banned threshold. diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index a0bd87819..a51634267 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -14,7 +14,7 @@ use crate::{ use reth_eth_wire::{ capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status, }; -use reth_primitives::{PeerId, H256}; +use reth_primitives::{ForkId, PeerId, H256}; use reth_provider::BlockProvider; use std::{ collections::{HashMap, VecDeque}, @@ -216,6 +216,12 @@ where self.state_fetcher.update_peer_block(peer_id, hash, number); } + /// Invoked on a [`ForkId`] update + #[allow(unused)] + pub(crate) fn update_fork_id(&mut self, _fork_id: ForkId) { + todo!() + } + /// Invoked after a `NewBlock` message was received by the peer. /// /// This will keep track of blocks we know a peer has @@ -254,8 +260,11 @@ where /// Event hook for events received from the discovery service. fn on_discovery_event(&mut self, event: DiscoveryEvent) { match event { - DiscoveryEvent::Discovered(node, addr) => { - self.peers_manager.add_discovered_node(node, addr); + DiscoveryEvent::Discovered(peer, addr) => { + self.peers_manager.add_discovered_node(peer, addr); + } + DiscoveryEvent::EnrForkId(peer, fork_id) => { + self.peers_manager.set_discovered_fork_id(peer, fork_id); } } } diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 044ab7b4c..c1eb9c155 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -13,7 +13,8 @@ use reth_network::{NetworkConfig, NetworkEvent, NetworkManager, PeersConfig}; use reth_primitives::PeerId; use reth_provider::test_utils::TestApi; use secp256k1::SecretKey; -use std::{collections::HashSet, net::SocketAddr, sync::Arc}; +use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration}; +use tokio::task; #[tokio::test(flavor = "multi_thread")] async fn test_establish_connections() { @@ -111,6 +112,14 @@ async fn test_connect_with_builder() { tokio::join!(network, requests); }); + let h = handle.clone(); + task::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(5)).await; + dbg!(h.num_connected_peers()); + } + }); + while let Some(ev) = events.next().await { dbg!(ev); }