feat: track node record (#4224)

This commit is contained in:
Matthias Seitz
2023-08-16 18:26:06 +02:00
committed by GitHub
parent b232e05b26
commit 491b453edb
3 changed files with 107 additions and 82 deletions

View File

@ -30,6 +30,7 @@ tokio-stream.workspace = true
# misc
tracing.workspace = true
thiserror.workspace = true
parking_lot.workspace = true
hex = "0.4"
rand = { workspace = true, optional = true }
generic-array = "0.14"

View File

@ -41,6 +41,7 @@ use discv5::{
ConnectionDirection, ConnectionState,
};
use enr::{Enr, EnrBuilder};
use parking_lot::Mutex;
use proto::{EnrRequest, EnrResponse, EnrWrapper};
use reth_primitives::{
bytes::{Bytes, BytesMut},
@ -137,7 +138,11 @@ pub struct Discv4 {
/// The address of the udp socket
local_addr: SocketAddr,
/// channel to send commands over to the service
to_service: mpsc::Sender<Discv4Command>,
to_service: mpsc::UnboundedSender<Discv4Command>,
/// Tracks the local node record.
///
/// This includes the currently tracked external IP address of the node.
node_record: Arc<Mutex<NodeRecord>>,
}
// === impl Discv4 ===
@ -163,10 +168,17 @@ impl Discv4 {
/// NOTE: this is only intended for test setups.
#[cfg(feature = "test-utils")]
pub fn noop() -> Self {
let (to_service, _rx) = mpsc::channel(1);
let (to_service, _rx) = mpsc::unbounded_channel();
let local_addr =
(IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
Self { local_addr, to_service }
Self {
local_addr,
to_service,
node_record: Arc::new(Mutex::new(NodeRecord::new(
"127.0.0.1:3030".parse().unwrap(),
PeerId::random(),
))),
}
}
/// Binds a new UdpSocket and creates the service
@ -218,10 +230,8 @@ impl Discv4 {
local_node_record.udp_port = local_addr.port();
trace!( target : "discv4", ?local_addr,"opened UDP socket");
let (to_service, rx) = mpsc::channel(100);
let service =
Discv4Service::new(socket, local_addr, local_node_record, secret_key, config, Some(rx));
let discv4 = Discv4 { local_addr, to_service };
let service = Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
let discv4 = service.handle();
Ok((discv4, service))
}
@ -230,9 +240,21 @@ impl Discv4 {
self.local_addr
}
/// Returns the [NodeRecord] of the local node.
///
/// This includes the currently tracked external IP address of the node.
pub fn node_record(&self) -> NodeRecord {
*self.node_record.lock()
}
/// Returns the currently tracked external IP of the node.
pub fn external_ip(&self) -> IpAddr {
self.node_record.lock().address
}
/// Sets the [Interval] used for periodically looking up targets over the network
pub fn set_lookup_interval(&self, duration: Duration) {
self.safe_send_to_service(Discv4Command::SetLookupInterval(duration))
self.send_to_service(Discv4Command::SetLookupInterval(duration))
}
/// Starts a `FindNode` recursive lookup that locates the closest nodes to the given node id. See also: <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
@ -261,7 +283,7 @@ impl Discv4 {
async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
let (tx, rx) = oneshot::channel();
let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
self.to_service.send(cmd).await?;
self.to_service.send(cmd)?;
Ok(rx.await?)
}
@ -274,13 +296,13 @@ impl Discv4 {
/// Removes the peer from the table, if it exists.
pub fn remove_peer(&self, node_id: PeerId) {
let cmd = Discv4Command::Remove(node_id);
self.safe_send_to_service(cmd);
self.send_to_service(cmd);
}
/// Adds the node to the table, if it is not already present.
pub fn add_node(&self, node_record: NodeRecord) {
let cmd = Discv4Command::Add(node_record);
self.safe_send_to_service(cmd);
self.send_to_service(cmd);
}
/// Adds the peer and id to the ban list.
@ -288,14 +310,14 @@ impl Discv4 {
/// This will prevent any future inclusion in the table
pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
let cmd = Discv4Command::Ban(node_id, ip);
self.safe_send_to_service(cmd);
self.send_to_service(cmd);
}
/// Adds the ip to the ban list.
///
/// This will prevent any future inclusion in the table
pub fn ban_ip(&self, ip: IpAddr) {
let cmd = Discv4Command::BanIp(ip);
self.safe_send_to_service(cmd);
self.send_to_service(cmd);
}
/// Adds the peer to the ban list.
@ -303,7 +325,7 @@ impl Discv4 {
/// This will prevent any future inclusion in the table
pub fn ban_node(&self, node_id: PeerId) {
let cmd = Discv4Command::BanPeer(node_id);
self.safe_send_to_service(cmd);
self.send_to_service(cmd);
}
/// Sets the tcp port
@ -311,7 +333,7 @@ impl Discv4 {
/// This will update our [`NodeRecord`]'s tcp port.
pub fn set_tcp_port(&self, port: u16) {
let cmd = Discv4Command::SetTcpPort(port);
self.safe_send_to_service(cmd);
self.send_to_service(cmd);
}
/// Sets the pair in the EIP-868 [`Enr`] of the node.
@ -321,7 +343,7 @@ impl Discv4 {
/// CAUTION: The value **must** be rlp encoded
pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
self.safe_send_to_service(cmd);
self.send_to_service(cmd);
}
/// Sets the pair in the EIP-868 [`Enr`] of the node.
@ -333,13 +355,9 @@ impl Discv4 {
self.set_eip868_rlp_pair(key, buf.freeze())
}
fn safe_send_to_service(&self, cmd: Discv4Command) {
// we want this message to always arrive, so we clone the sender
let _ = self.to_service.clone().try_send(cmd);
}
#[inline]
fn send_to_service(&self, cmd: Discv4Command) {
let _ = self.to_service.try_send(cmd).map_err(|err| {
let _ = self.to_service.send(cmd).map_err(|err| {
debug!(
target : "discv4",
%err,
@ -352,7 +370,7 @@ impl Discv4 {
pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
let (tx, rx) = oneshot::channel();
let cmd = Discv4Command::Updates(tx);
self.to_service.send(cmd).await?;
self.to_service.send(cmd)?;
Ok(rx.await?)
}
}
@ -366,6 +384,8 @@ pub struct Discv4Service {
local_eip_868_enr: Enr<SecretKey>,
/// Local ENR of the server.
local_node_record: NodeRecord,
/// Keeps track of the node record of the local node.
shared_node_record: Arc<Mutex<NodeRecord>>,
/// The secret key used to sign payloads
secret_key: SecretKey,
/// The UDP socket for sending and receiving messages.
@ -393,8 +413,10 @@ pub struct Discv4Service {
pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
/// Currently active ENR requests
pending_enr_requests: HashMap<PeerId, EnrRequestState>,
/// Commands listener
commands_rx: Option<mpsc::Receiver<Discv4Command>>,
/// Copy of he sender half of the commands channel for [Discv4]
to_service: mpsc::UnboundedSender<Discv4Command>,
/// Receiver half of the commands channel for [Discv4]
commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
/// All subscribers for table updates
update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
/// The interval when to trigger lookups
@ -425,7 +447,6 @@ impl Discv4Service {
local_node_record: NodeRecord,
secret_key: SecretKey,
config: Discv4Config,
commands_rx: Option<mpsc::Receiver<Discv4Command>>,
) -> Self {
let socket = Arc::new(socket);
let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
@ -485,10 +506,15 @@ impl Discv4Service {
builder.build(&secret_key).expect("v4 is set; qed")
};
let (to_service, commands_rx) = mpsc::unbounded_channel();
let shared_node_record = Arc::new(Mutex::new(local_node_record));
Discv4Service {
local_address,
local_eip_868_enr,
local_node_record,
shared_node_record,
_socket: socket,
kbuckets,
secret_key,
@ -500,6 +526,7 @@ impl Discv4Service {
pending_find_nodes: Default::default(),
pending_enr_requests: Default::default(),
commands_rx,
to_service,
update_listeners: Vec::with_capacity(1),
lookup_interval: self_lookup_interval,
ping_interval,
@ -513,6 +540,15 @@ impl Discv4Service {
}
}
/// Returns the frontend handle that can communicate with the service via commands.
pub fn handle(&self) -> Discv4 {
Discv4 {
local_addr: self.local_address,
to_service: self.to_service.clone(),
node_record: self.shared_node_record.clone(),
}
}
/// Returns the current enr sequence
fn enr_seq(&self) -> Option<u64> {
(self.config.enable_eip868).then(|| self.local_eip_868_enr.seq())
@ -530,6 +566,8 @@ impl Discv4Service {
debug!(target : "discv4", ?external_ip, "Updating external ip");
self.local_node_record.address = external_ip;
let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
let mut lock = self.shared_node_record.lock();
*lock = self.local_node_record;
debug!(target : "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
}
}
@ -1457,63 +1495,48 @@ impl Discv4Service {
self.set_external_ip_addr(ip);
}
// process all incoming commands
if let Some(mut rx) = self.commands_rx.take() {
let mut is_done = false;
while let Poll::Ready(cmd) = rx.poll_recv(cx) {
if let Some(cmd) = cmd {
match cmd {
Discv4Command::Add(enr) => {
self.add_node(enr);
}
Discv4Command::Lookup { node_id, tx } => {
let node_id = node_id.unwrap_or(self.local_node_record.id);
self.lookup_with(node_id, tx);
}
Discv4Command::SetLookupInterval(duration) => {
self.set_lookup_interval(duration);
}
Discv4Command::Updates(tx) => {
let rx = self.update_stream();
let _ = tx.send(rx);
}
Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
Discv4Command::Remove(node_id) => {
self.remove_node(node_id);
}
Discv4Command::Ban(node_id, ip) => {
self.ban_node(node_id);
self.ban_ip(ip);
}
Discv4Command::BanIp(ip) => {
self.ban_ip(ip);
}
Discv4Command::SetEIP868RLPPair { key, rlp } => {
debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
let _ = self.local_eip_868_enr.insert_raw_rlp(
key,
rlp,
&self.secret_key,
);
}
Discv4Command::SetTcpPort(port) => {
debug!(target: "discv4", %port, "Update tcp port");
self.local_node_record.tcp_port = port;
if self.local_node_record.address.is_ipv4() {
let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
} else {
let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
}
}
}
} else {
is_done = true;
break
// process all incoming commands, this channel can never close
while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
match cmd {
Discv4Command::Add(enr) => {
self.add_node(enr);
}
Discv4Command::Lookup { node_id, tx } => {
let node_id = node_id.unwrap_or(self.local_node_record.id);
self.lookup_with(node_id, tx);
}
Discv4Command::SetLookupInterval(duration) => {
self.set_lookup_interval(duration);
}
Discv4Command::Updates(tx) => {
let rx = self.update_stream();
let _ = tx.send(rx);
}
Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
Discv4Command::Remove(node_id) => {
self.remove_node(node_id);
}
Discv4Command::Ban(node_id, ip) => {
self.ban_node(node_id);
self.ban_ip(ip);
}
Discv4Command::BanIp(ip) => {
self.ban_ip(ip);
}
Discv4Command::SetEIP868RLPPair { key, rlp } => {
debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
}
Discv4Command::SetTcpPort(port) => {
debug!(target: "discv4", %port, "Update tcp port");
self.local_node_record.tcp_port = port;
if self.local_node_record.address.is_ipv4() {
let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
} else {
let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
}
}
}
if !is_done {
self.commands_rx = Some(rx);
}
}