From 491b453edbd7bccb1a50539e9c2d2e8a119b15aa Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 16 Aug 2023 18:26:06 +0200 Subject: [PATCH] feat: track node record (#4224) --- Cargo.lock | 1 + crates/net/discv4/Cargo.toml | 1 + crates/net/discv4/src/lib.rs | 187 ++++++++++++++++++++--------------- 3 files changed, 107 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f3540c434..968bff1ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5458,6 +5458,7 @@ dependencies = [ "enr 0.8.1", "generic-array", "hex", + "parking_lot 0.12.1", "rand 0.8.5", "reth-net-common", "reth-net-nat", diff --git a/crates/net/discv4/Cargo.toml b/crates/net/discv4/Cargo.toml index 2c5bd92cb..bd9a1f95d 100644 --- a/crates/net/discv4/Cargo.toml +++ b/crates/net/discv4/Cargo.toml @@ -30,6 +30,7 @@ tokio-stream.workspace = true # misc tracing.workspace = true thiserror.workspace = true +parking_lot.workspace = true hex = "0.4" rand = { workspace = true, optional = true } generic-array = "0.14" diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index 1ddedb1b3..478363da4 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -41,6 +41,7 @@ use discv5::{ ConnectionDirection, ConnectionState, }; use enr::{Enr, EnrBuilder}; +use parking_lot::Mutex; use proto::{EnrRequest, EnrResponse, EnrWrapper}; use reth_primitives::{ bytes::{Bytes, BytesMut}, @@ -137,7 +138,11 @@ pub struct Discv4 { /// The address of the udp socket local_addr: SocketAddr, /// channel to send commands over to the service - to_service: mpsc::Sender, + to_service: mpsc::UnboundedSender, + /// Tracks the local node record. + /// + /// This includes the currently tracked external IP address of the node. + node_record: Arc>, } // === impl Discv4 === @@ -163,10 +168,17 @@ impl Discv4 { /// NOTE: this is only intended for test setups. #[cfg(feature = "test-utils")] pub fn noop() -> Self { - let (to_service, _rx) = mpsc::channel(1); + let (to_service, _rx) = mpsc::unbounded_channel(); let local_addr = (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into(); - Self { local_addr, to_service } + Self { + local_addr, + to_service, + node_record: Arc::new(Mutex::new(NodeRecord::new( + "127.0.0.1:3030".parse().unwrap(), + PeerId::random(), + ))), + } } /// Binds a new UdpSocket and creates the service @@ -218,10 +230,8 @@ impl Discv4 { local_node_record.udp_port = local_addr.port(); trace!( target : "discv4", ?local_addr,"opened UDP socket"); - let (to_service, rx) = mpsc::channel(100); - let service = - Discv4Service::new(socket, local_addr, local_node_record, secret_key, config, Some(rx)); - let discv4 = Discv4 { local_addr, to_service }; + let service = Discv4Service::new(socket, local_addr, local_node_record, secret_key, config); + let discv4 = service.handle(); Ok((discv4, service)) } @@ -230,9 +240,21 @@ impl Discv4 { self.local_addr } + /// Returns the [NodeRecord] of the local node. + /// + /// This includes the currently tracked external IP address of the node. + pub fn node_record(&self) -> NodeRecord { + *self.node_record.lock() + } + + /// Returns the currently tracked external IP of the node. + pub fn external_ip(&self) -> IpAddr { + self.node_record.lock().address + } + /// Sets the [Interval] used for periodically looking up targets over the network pub fn set_lookup_interval(&self, duration: Duration) { - self.safe_send_to_service(Discv4Command::SetLookupInterval(duration)) + self.send_to_service(Discv4Command::SetLookupInterval(duration)) } /// Starts a `FindNode` recursive lookup that locates the closest nodes to the given node id. See also: @@ -261,7 +283,7 @@ impl Discv4 { async fn lookup_node(&self, node_id: Option) -> Result, Discv4Error> { let (tx, rx) = oneshot::channel(); let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) }; - self.to_service.send(cmd).await?; + self.to_service.send(cmd)?; Ok(rx.await?) } @@ -274,13 +296,13 @@ impl Discv4 { /// Removes the peer from the table, if it exists. pub fn remove_peer(&self, node_id: PeerId) { let cmd = Discv4Command::Remove(node_id); - self.safe_send_to_service(cmd); + self.send_to_service(cmd); } /// Adds the node to the table, if it is not already present. pub fn add_node(&self, node_record: NodeRecord) { let cmd = Discv4Command::Add(node_record); - self.safe_send_to_service(cmd); + self.send_to_service(cmd); } /// Adds the peer and id to the ban list. @@ -288,14 +310,14 @@ impl Discv4 { /// This will prevent any future inclusion in the table pub fn ban(&self, node_id: PeerId, ip: IpAddr) { let cmd = Discv4Command::Ban(node_id, ip); - self.safe_send_to_service(cmd); + self.send_to_service(cmd); } /// Adds the ip to the ban list. /// /// This will prevent any future inclusion in the table pub fn ban_ip(&self, ip: IpAddr) { let cmd = Discv4Command::BanIp(ip); - self.safe_send_to_service(cmd); + self.send_to_service(cmd); } /// Adds the peer to the ban list. @@ -303,7 +325,7 @@ impl Discv4 { /// This will prevent any future inclusion in the table pub fn ban_node(&self, node_id: PeerId) { let cmd = Discv4Command::BanPeer(node_id); - self.safe_send_to_service(cmd); + self.send_to_service(cmd); } /// Sets the tcp port @@ -311,7 +333,7 @@ impl Discv4 { /// This will update our [`NodeRecord`]'s tcp port. pub fn set_tcp_port(&self, port: u16) { let cmd = Discv4Command::SetTcpPort(port); - self.safe_send_to_service(cmd); + self.send_to_service(cmd); } /// Sets the pair in the EIP-868 [`Enr`] of the node. @@ -321,7 +343,7 @@ impl Discv4 { /// CAUTION: The value **must** be rlp encoded pub fn set_eip868_rlp_pair(&self, key: Vec, rlp: Bytes) { let cmd = Discv4Command::SetEIP868RLPPair { key, rlp }; - self.safe_send_to_service(cmd); + self.send_to_service(cmd); } /// Sets the pair in the EIP-868 [`Enr`] of the node. @@ -333,13 +355,9 @@ impl Discv4 { self.set_eip868_rlp_pair(key, buf.freeze()) } - fn safe_send_to_service(&self, cmd: Discv4Command) { - // we want this message to always arrive, so we clone the sender - let _ = self.to_service.clone().try_send(cmd); - } - + #[inline] fn send_to_service(&self, cmd: Discv4Command) { - let _ = self.to_service.try_send(cmd).map_err(|err| { + let _ = self.to_service.send(cmd).map_err(|err| { debug!( target : "discv4", %err, @@ -352,7 +370,7 @@ impl Discv4 { pub async fn update_stream(&self) -> Result, Discv4Error> { let (tx, rx) = oneshot::channel(); let cmd = Discv4Command::Updates(tx); - self.to_service.send(cmd).await?; + self.to_service.send(cmd)?; Ok(rx.await?) } } @@ -366,6 +384,8 @@ pub struct Discv4Service { local_eip_868_enr: Enr, /// Local ENR of the server. local_node_record: NodeRecord, + /// Keeps track of the node record of the local node. + shared_node_record: Arc>, /// The secret key used to sign payloads secret_key: SecretKey, /// The UDP socket for sending and receiving messages. @@ -393,8 +413,10 @@ pub struct Discv4Service { pending_find_nodes: HashMap, /// Currently active ENR requests pending_enr_requests: HashMap, - /// Commands listener - commands_rx: Option>, + /// Copy of he sender half of the commands channel for [Discv4] + to_service: mpsc::UnboundedSender, + /// Receiver half of the commands channel for [Discv4] + commands_rx: mpsc::UnboundedReceiver, /// All subscribers for table updates update_listeners: Vec>, /// The interval when to trigger lookups @@ -425,7 +447,6 @@ impl Discv4Service { local_node_record: NodeRecord, secret_key: SecretKey, config: Discv4Config, - commands_rx: Option>, ) -> Self { let socket = Arc::new(socket); let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer); @@ -485,10 +506,15 @@ impl Discv4Service { builder.build(&secret_key).expect("v4 is set; qed") }; + let (to_service, commands_rx) = mpsc::unbounded_channel(); + + let shared_node_record = Arc::new(Mutex::new(local_node_record)); + Discv4Service { local_address, local_eip_868_enr, local_node_record, + shared_node_record, _socket: socket, kbuckets, secret_key, @@ -500,6 +526,7 @@ impl Discv4Service { pending_find_nodes: Default::default(), pending_enr_requests: Default::default(), commands_rx, + to_service, update_listeners: Vec::with_capacity(1), lookup_interval: self_lookup_interval, ping_interval, @@ -513,6 +540,15 @@ impl Discv4Service { } } + /// Returns the frontend handle that can communicate with the service via commands. + pub fn handle(&self) -> Discv4 { + Discv4 { + local_addr: self.local_address, + to_service: self.to_service.clone(), + node_record: self.shared_node_record.clone(), + } + } + /// Returns the current enr sequence fn enr_seq(&self) -> Option { (self.config.enable_eip868).then(|| self.local_eip_868_enr.seq()) @@ -530,6 +566,8 @@ impl Discv4Service { debug!(target : "discv4", ?external_ip, "Updating external ip"); self.local_node_record.address = external_ip; let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key); + let mut lock = self.shared_node_record.lock(); + *lock = self.local_node_record; debug!(target : "discv4", enr=?self.local_eip_868_enr, "Updated local ENR"); } } @@ -1457,63 +1495,48 @@ impl Discv4Service { self.set_external_ip_addr(ip); } - // process all incoming commands - if let Some(mut rx) = self.commands_rx.take() { - let mut is_done = false; - while let Poll::Ready(cmd) = rx.poll_recv(cx) { - if let Some(cmd) = cmd { - match cmd { - Discv4Command::Add(enr) => { - self.add_node(enr); - } - Discv4Command::Lookup { node_id, tx } => { - let node_id = node_id.unwrap_or(self.local_node_record.id); - self.lookup_with(node_id, tx); - } - Discv4Command::SetLookupInterval(duration) => { - self.set_lookup_interval(duration); - } - Discv4Command::Updates(tx) => { - let rx = self.update_stream(); - let _ = tx.send(rx); - } - Discv4Command::BanPeer(node_id) => self.ban_node(node_id), - Discv4Command::Remove(node_id) => { - self.remove_node(node_id); - } - Discv4Command::Ban(node_id, ip) => { - self.ban_node(node_id); - self.ban_ip(ip); - } - Discv4Command::BanIp(ip) => { - self.ban_ip(ip); - } - Discv4Command::SetEIP868RLPPair { key, rlp } => { - debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair"); - - let _ = self.local_eip_868_enr.insert_raw_rlp( - key, - rlp, - &self.secret_key, - ); - } - Discv4Command::SetTcpPort(port) => { - debug!(target: "discv4", %port, "Update tcp port"); - self.local_node_record.tcp_port = port; - if self.local_node_record.address.is_ipv4() { - let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key); - } else { - let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key); - } - } - } - } else { - is_done = true; - break + // process all incoming commands, this channel can never close + while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) { + match cmd { + Discv4Command::Add(enr) => { + self.add_node(enr); + } + Discv4Command::Lookup { node_id, tx } => { + let node_id = node_id.unwrap_or(self.local_node_record.id); + self.lookup_with(node_id, tx); + } + Discv4Command::SetLookupInterval(duration) => { + self.set_lookup_interval(duration); + } + Discv4Command::Updates(tx) => { + let rx = self.update_stream(); + let _ = tx.send(rx); + } + Discv4Command::BanPeer(node_id) => self.ban_node(node_id), + Discv4Command::Remove(node_id) => { + self.remove_node(node_id); + } + Discv4Command::Ban(node_id, ip) => { + self.ban_node(node_id); + self.ban_ip(ip); + } + Discv4Command::BanIp(ip) => { + self.ban_ip(ip); + } + Discv4Command::SetEIP868RLPPair { key, rlp } => { + debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair"); + + let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key); + } + Discv4Command::SetTcpPort(port) => { + debug!(target: "discv4", %port, "Update tcp port"); + self.local_node_record.tcp_port = port; + if self.local_node_record.address.is_ipv4() { + let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key); + } else { + let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key); + } } - } - if !is_done { - self.commands_rx = Some(rx); } }