feat(net): add discv4 crate (#113)

* port kad

* feat: port kad bucket

* feat: add discv4

* chore: rustfmt

* cargo update

* just reuse discv5 table

* test: add rlp tests

* message encoding

* feat: impl codec roundtrip testing

* more work in message handling

* implement ping

* feat: impl commands

* cleanup

* more cleanup

* trim config

* more docs

* feat: implement recursive lookup

* docs

* cleanup config

* feat: implement update stream

* chore: config cleanup

* docs: add crate docs

* feat: more testing

* fix deny

* clarify ring

* docs: more docs

* use discv5 master

* docs: address review and add comments

* update readme

* rustmft

* chore(clippy): make clippy happy
This commit is contained in:
Matthias Seitz
2022-10-25 14:23:24 +02:00
committed by GitHub
parent 2b67e75c05
commit ce64fefd78
11 changed files with 3230 additions and 61 deletions

937
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -10,6 +10,7 @@ members = [
"crates/net/p2p", "crates/net/p2p",
"crates/net/ecies", "crates/net/ecies",
"crates/net/eth-wire", "crates/net/eth-wire",
"crates/net/discv4",
"crates/net/rpc", "crates/net/rpc",
"crates/net/rpc-api", "crates/net/rpc-api",
"crates/net/rpc-types", "crates/net/rpc-types",

View File

@ -0,0 +1,42 @@
[package]
name = "reth-discv4"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/foundry-rs/reth"
readme = "README.md"
description = """
Ethereum network support
"""
[dependencies]
# reth
reth-primitives = { path = "../../primitives" }
reth-rlp = { path = "../../common/rlp" }
reth-rlp-derive = { path = "../../common/rlp-derive" }
# ethereum
discv5 = { git = "https://github.com/sigp/discv5" }
secp256k1 = { version = "0.24", features = [
"global-context",
"rand-std",
"recovery",
] }
# async/futures
tokio = { version = "1", features = ["io-util", "net", "time"] }
tokio-stream = "0.1"
# misc
generic-array = "0.14"
tracing = "0.1"
bytes = "1.2"
thiserror = "1.0"
url = "2.3"
hex = "0.4"
public-ip = "0.2"
[dev-dependencies]
rand = "0.8"
tokio = { version = "1", features = ["full"] }
tracing-test = "0.2"

View File

@ -0,0 +1,22 @@
# <h1 align="center"> discv4 </h1>
This is a rust implementation of
the [Discovery v4](https://github.com/ethereum/devp2p/blob/40ab248bf7e017e83cc9812a4e048446709623e8/discv4.md)
peer discovery protocol.
For comparison to Discovery v5,
see [discv5#comparison-with-node-discovery-v4](https://github.com/ethereum/devp2p/blob/40ab248bf7e017e83cc9812a4e048446709623e8/discv5/discv5.md#comparison-with-node-discovery-v4)
This is inspired by the [discv5](https://github.com/sigp/discv5) crate and reuses its kademlia implementation.
## Finding peers
The discovery service continuously attempts to connect to other nodes on the network until it has found enough peers.
If UPnP (Universal Plug and Play) is supported by the router the service is running on, it will also accept connections
from external nodes. In the discovery protocol, nodes exchange information about where the node can be reached to
eventually establish RLPx sessions.
## Trouble Shooting
The discv4 protocol depends on the local system clock. If the clock is not accurate it can cause connectivity issues
because the expiration timestamps might be wrong.

View File

@ -0,0 +1,61 @@
//! Various known bootstrap nodes for networks
// <https://github.com/ledgerwatch/erigon/blob/610e648dc43ec8cd6563313e28f06f534a9091b3/params/bootnodes.go>
use crate::node::NodeRecord;
/// Ethereum Foundation Go Bootnodes
pub static MAINNET_BOOTNODES : [&str; 8] = [
"enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@18.138.108.67:30303", // bootnode-aws-ap-southeast-1-001
"enode://22a8232c3abc76a16ae9d6c3b164f98775fe226f0917b0ca871128a74a8e9630b458460865bab457221f1d448dd9791d24c4e5d88786180ac185df813a68d4de@3.209.45.79:30303", // bootnode-aws-us-east-1-001
"enode://8499da03c47d637b20eee24eec3c356c9a2e6148d6fe25ca195c7949ab8ec2c03e3556126b0d7ed644675e78c4318b08691b7b57de10e5f0d40d05b09238fa0a@52.187.207.27:30303", // bootnode-azure-australiaeast-001
"enode://103858bdb88756c71f15e9b5e09b56dc1be52f0a5021d46301dbbfb7e130029cc9d0d6f73f693bc29b665770fff7da4d34f3c6379fe12721b5d7a0bcb5ca1fc1@191.234.162.198:30303", // bootnode-azure-brazilsouth-001
"enode://715171f50508aba88aecd1250af392a45a330af91d7b90701c436b618c86aaa1589c9184561907bebbb56439b8f8787bc01f49a7c77276c58c1b09822d75e8e8@52.231.165.108:30303", // bootnode-azure-koreasouth-001
"enode://5d6d7cd20d6da4bb83a1d28cadb5d409b64edf314c0335df658c1a54e32c7c4a7ab7823d57c39b6a757556e68ff1df17c748b698544a55cb488b52479a92b60f@104.42.217.25:30303", // bootnode-azure-westus-001
"enode://2b252ab6a1d0f971d9722cb839a42cb81db019ba44c08754628ab4a823487071b5695317c8ccd085219c3a03af063495b2f1da8d18218da2d6a82981b45e6ffc@65.108.70.101:30303", // bootnode-hetzner-hel
"enode://4aeb4ab6c14b23e2c4cfdce879c04b0748a20d8e9b59e25ded2a08143e265c6c25936e74cbc8e641e3312ca288673d91f2f93f8e277de3cfa444ecdaaf982052@157.90.35.166:30303", // bootnode-hetzner-fsn
];
/// SEPOLIA BOOTNODES
pub static SEPOLIA_BOOTNODES : [&str; 2] = [
// geth
"enode://9246d00bc8fd1742e5ad2428b80fc4dc45d786283e05ef6edbd9002cbc335d40998444732fbe921cb88e1d2c73d1b1de53bae6a2237996e9bfe14f871baf7066@18.168.182.86:30303",
// besu
"enode://ec66ddcf1a974950bd4c782789a7e04f8aa7110a72569b6e65fcd51e937e74eed303b1ea734e4d19cfaec9fbff9b6ee65bf31dcb50ba79acce9dd63a6aca61c7@52.14.151.177:30303",
];
/// GOERLI bootnodes
pub static GOERLI_BOOTNODES : [&str; 7] = [
// Upstream bootnodes
"enode://011f758e6552d105183b1761c5e2dea0111bc20fd5f6422bc7f91e0fabbec9a6595caf6239b37feb773dddd3f87240d99d859431891e4a642cf2a0a9e6cbb98a@51.141.78.53:30303",
"enode://176b9417f511d05b6b2cf3e34b756cf0a7096b3094572a8f6ef4cdcb9d1f9d00683bf0f83347eebdf3b81c3521c2332086d9592802230bf528eaf606a1d9677b@13.93.54.137:30303",
"enode://46add44b9f13965f7b9875ac6b85f016f341012d84f975377573800a863526f4da19ae2c620ec73d11591fa9510e992ecc03ad0751f53cc02f7c7ed6d55c7291@94.237.54.114:30313",
"enode://b5948a2d3e9d486c4d75bf32713221c2bd6cf86463302339299bd227dc2e276cd5a1c7ca4f43a0e9122fe9af884efed563bd2a1fd28661f3b5f5ad7bf1de5949@18.218.250.66:30303",
// Ethereum Foundation bootnode
"enode://a61215641fb8714a373c80edbfa0ea8878243193f57c96eeb44d0bc019ef295abd4e044fd619bfc4c59731a73fb79afe84e9ab6da0c743ceb479cbb6d263fa91@3.11.147.67:30303",
// Goerli Initiative bootnodes
"enode://d4f764a48ec2a8ecf883735776fdefe0a3949eb0ca476bd7bc8d0954a9defe8fea15ae5da7d40b5d2d59ce9524a99daedadf6da6283fca492cc80b53689fb3b3@46.4.99.122:32109",
"enode://d2b720352e8216c9efc470091aa91ddafc53e222b32780f505c817ceef69e01d5b0b0797b69db254c586f493872352f5a022b4d8479a00fc92ec55f9ad46a27e@88.99.70.182:30303",
];
/// Returns parsed mainnet nodes
pub fn mainnet_nodes() -> Vec<NodeRecord> {
parse_nodes(&MAINNET_BOOTNODES[..])
}
/// Returns parsed goerli nodes
pub fn goerli_nodes() -> Vec<NodeRecord> {
parse_nodes(&GOERLI_BOOTNODES[..])
}
/// Returns parsed sepolia nodes
pub fn sepolia_nodes() -> Vec<NodeRecord> {
parse_nodes(&SEPOLIA_BOOTNODES[..])
}
/// Parses all the nodes
pub fn parse_nodes(nodes: impl IntoIterator<Item = impl AsRef<str>>) -> Vec<NodeRecord> {
nodes.into_iter().map(|s| s.as_ref().parse().unwrap()).collect()
}

View File

@ -0,0 +1,128 @@
use crate::node::NodeRecord;
use discv5::PermitBanList;
///! A set of configuration parameters to tune the discovery protocol.
// This basis of this file has been taken from the discv5 codebase:
// https://github.com/sigp/discv5
use std::collections::HashSet;
use std::time::Duration;
/// Configuration parameters that define the performance of the discovery network.
#[derive(Clone, Debug)]
pub struct Discv4Config {
/// Whether to enable the incoming packet filter. Default: false.
pub enable_packet_filter: bool,
/// The number of retries for each UDP request. Default: 1.
pub request_retries: u8,
/// The time between pings to ensure connectivity amongst connected nodes. Default: 300
/// seconds.
pub ping_interval: Duration,
/// The duration of we consider a ping timed out.
pub ping_timeout: Duration,
/// The rate at which lookups should be triggered.
pub lookup_interval: Duration,
/// The duration of we consider a FindNode request timed out.
pub find_node_timeout: Duration,
/// The duration we set for neighbours responses
pub neighbours_timeout: Duration,
/// A set of lists that permit or ban IP's or NodeIds from the server. See
/// `crate::PermitBanList`.
pub permit_ban_list: PermitBanList,
/// Set the default duration for which nodes are banned for. This timeouts are checked every 5
/// minutes, so the precision will be to the nearest 5 minutes. If set to `None`, bans from
/// the filter will last indefinitely. Default is 1 hour.
pub ban_duration: Option<Duration>,
/// Nodes to boot from.
pub bootstrap_nodes: HashSet<NodeRecord>,
}
impl Discv4Config {
/// Returns a new default builder instance
pub fn builder() -> Discv4ConfigBuilder {
Default::default()
}
}
impl Default for Discv4Config {
fn default() -> Self {
Self {
enable_packet_filter: false,
request_retries: 1,
ping_interval: Duration::from_secs(300),
ping_timeout: Duration::from_secs(5),
find_node_timeout: Duration::from_secs(2),
neighbours_timeout: Duration::from_secs(30),
lookup_interval: Duration::from_secs(20),
permit_ban_list: PermitBanList::default(),
ban_duration: Some(Duration::from_secs(3600)), // 1 hour
bootstrap_nodes: Default::default(),
}
}
}
#[derive(Debug, Default)]
pub struct Discv4ConfigBuilder {
config: Discv4Config,
}
impl Discv4ConfigBuilder {
/// Whether to enable the incoming packet filter.
pub fn enable_packet_filter(&mut self) -> &mut Self {
self.config.enable_packet_filter = true;
self
}
/// The number of retries for each UDP request.
pub fn request_retries(&mut self, retries: u8) -> &mut Self {
self.config.request_retries = retries;
self
}
/// The time between pings to ensure connectivity amongst connected nodes.
pub fn ping_interval(&mut self, interval: Duration) -> &mut Self {
self.config.ping_interval = interval;
self
}
/// Sets the timeout for pings
pub fn ping_timeout(&mut self, duration: Duration) -> &mut Self {
self.config.ping_timeout = duration;
self
}
/// A set of lists that permit or ban IP's or NodeIds from the server. See
/// `crate::PermitBanList`.
pub fn permit_ban_list(&mut self, list: PermitBanList) -> &mut Self {
self.config.permit_ban_list = list;
self
}
/// Sets the lookup interval duration.
pub fn lookup_interval(&mut self, lookup_interval: Duration) -> &mut Self {
self.config.lookup_interval = lookup_interval;
self
}
/// Set the default duration for which nodes are banned for. This timeouts are checked every 5
/// minutes, so the precision will be to the nearest 5 minutes. If set to `None`, bans from
/// the filter will last indefinitely. Default is 1 hour.
pub fn ban_duration(&mut self, ban_duration: Option<Duration>) -> &mut Self {
self.config.ban_duration = ban_duration;
self
}
/// Adds a boot node
pub fn add_boot_node(&mut self, node: NodeRecord) -> &mut Self {
self.config.bootstrap_nodes.insert(node);
self
}
/// Adds multiple boot nodes
pub fn add_boot_nodes(&mut self, nodes: impl IntoIterator<Item = NodeRecord>) -> &mut Self {
self.config.bootstrap_nodes.extend(nodes);
self
}
pub fn build(&mut self) -> Discv4Config {
self.config.clone()
}
}

View File

@ -0,0 +1,36 @@
//! Error types that can occur in this crate.
use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError};
/// Error thrown when decoding a UDP packet.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum DecodePacketError {
#[error("Failed to rlp decode: {0:?}")]
Rlp(#[from] reth_rlp::DecodeError),
#[error("Received packet len too short.")]
PacketTooShort,
#[error("Hash of the header not equals to the hash of the data.")]
HashMismatch,
#[error("Message id {0} is not supported.")]
UnknownMessage(u8),
#[error("Failed to recover public key: {0:?}")]
Secp256k1(#[from] secp256k1::Error),
}
/// High level errors that can occur when interacting with the discovery service
#[derive(Debug, thiserror::Error)]
pub enum Discv4Error {
/// Failed to send a command over the channel
#[error("Failed to send on a closed channel")]
Send,
/// Failed to receive a command response
#[error(transparent)]
Receive(#[from] RecvError),
}
impl<T> From<SendError<T>> for Discv4Error {
fn from(_: SendError<T>) -> Self {
Discv4Error::Send
}
}

1273
crates/net/discv4/src/lib.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,211 @@
use crate::{proto::Octets, NodeId};
use bytes::{Buf, BufMut};
use generic_array::GenericArray;
use reth_primitives::keccak256;
use reth_rlp::{Decodable, DecodeError, Encodable};
use reth_rlp_derive::RlpEncodable;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
str::FromStr,
};
use url::{Host, Url};
/// The key type for the table.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub(crate) struct NodeKey(pub(crate) NodeId);
impl From<NodeId> for NodeKey {
fn from(value: NodeId) -> Self {
NodeKey(value)
}
}
impl From<NodeKey> for discv5::Key<NodeKey> {
fn from(value: NodeKey) -> Self {
let hash = keccak256(value.0.as_bytes());
let hash = *GenericArray::from_slice(hash.as_bytes());
discv5::Key::new_raw(value, hash)
}
}
/// Converts a `NodeId` into the required `Key` type for the table
#[inline]
pub(crate) fn kad_key(node: NodeId) -> discv5::Key<NodeKey> {
discv5::kbucket::Key::from(NodeKey::from(node))
}
/// Represents a ENR in discv4.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub struct NodeRecord {
/// The Address of a node.
pub address: IpAddr,
/// TCP port of the port that accepts connections.
pub tcp_port: u16,
/// UDP discovery port.
pub udp_port: u16,
/// Public key of the discovery service
pub id: NodeId,
}
impl NodeRecord {
/// Creates a new record
#[allow(unused)]
pub(crate) fn new(addr: SocketAddr, id: NodeId) -> Self {
Self { address: addr.ip(), tcp_port: addr.port(), udp_port: addr.port(), id }
}
/// The TCP socket address of this node
#[must_use]
pub fn tcp_addr(&self) -> SocketAddr {
SocketAddr::new(self.address, self.tcp_port)
}
/// The UDP socket address of this node
#[must_use]
pub fn udp_addr(&self) -> SocketAddr {
SocketAddr::new(self.address, self.udp_port)
}
/// Returns the key type for the kademlia table
#[must_use]
#[inline]
pub(crate) fn key(&self) -> discv5::Key<NodeKey> {
NodeKey(self.id).into()
}
}
/// Possible error types when parsing a `NodeRecord`
#[derive(Debug, thiserror::Error)]
pub enum NodeRecordParseError {
#[error("Failed to parse url: {0}")]
InvalidUrl(String),
#[error("Failed to parse id")]
InvalidId(String),
}
impl FromStr for NodeRecord {
type Err = NodeRecordParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let url = Url::parse(s).map_err(|e| NodeRecordParseError::InvalidUrl(e.to_string()))?;
let address = match url.host() {
Some(Host::Ipv4(ip)) => IpAddr::V4(ip),
Some(Host::Ipv6(ip)) => IpAddr::V6(ip),
Some(Host::Domain(ip)) => IpAddr::V4(
Ipv4Addr::from_str(ip)
.map_err(|e| NodeRecordParseError::InvalidUrl(e.to_string()))?,
),
_ => return Err(NodeRecordParseError::InvalidUrl(format!("invalid host: {url:?}"))),
};
let port = url
.port()
.ok_or_else(|| NodeRecordParseError::InvalidUrl("no port specified".to_string()))?;
let id = url
.username()
.parse::<NodeId>()
.map_err(|e| NodeRecordParseError::InvalidId(e.to_string()))?;
Ok(Self { address, id, tcp_port: port, udp_port: port })
}
}
impl Encodable for NodeRecord {
fn encode(&self, out: &mut dyn BufMut) {
#[derive(RlpEncodable)]
struct EncodeNode {
octets: Octets,
udp_port: u16,
tcp_port: u16,
id: NodeId,
}
let octets = match self.address {
IpAddr::V4(addr) => Octets::V4(addr.octets()),
IpAddr::V6(addr) => Octets::V6(addr.octets()),
};
let node =
EncodeNode { octets, udp_port: self.udp_port, tcp_port: self.tcp_port, id: self.id };
node.encode(out)
}
}
impl Decodable for NodeRecord {
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError> {
let b = &mut &**buf;
let rlp_head = reth_rlp::Header::decode(b)?;
if !rlp_head.list {
return Err(DecodeError::UnexpectedString)
}
let started_len = b.len();
let octets = Octets::decode(b)?;
let this = Self {
address: octets.into(),
udp_port: Decodable::decode(b)?,
tcp_port: Decodable::decode(b)?,
id: Decodable::decode(b)?,
};
// the ENR record can contain additional entries that we skip
let consumed = started_len - b.len();
if consumed > rlp_head.payload_length {
return Err(DecodeError::ListLengthMismatch {
expected: rlp_head.payload_length,
got: consumed,
})
}
let rem = rlp_head.payload_length - consumed;
b.advance(rem);
*buf = *b;
Ok(this)
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::BytesMut;
use rand::{thread_rng, Rng, RngCore};
#[test]
fn test_noderecord_codec_ipv4() {
let mut rng = thread_rng();
for _ in 0..100 {
let mut ip = [0u8; 4];
rng.fill_bytes(&mut ip);
let record = NodeRecord {
address: IpAddr::V4(ip.into()),
tcp_port: rng.gen(),
udp_port: rng.gen(),
id: NodeId::random(),
};
let mut buf = BytesMut::new();
record.encode(&mut buf);
let decoded = NodeRecord::decode(&mut buf.as_ref()).unwrap();
assert_eq!(record, decoded);
}
}
#[test]
fn test_noderecord_codec_ipv6() {
let mut rng = thread_rng();
for _ in 0..100 {
let mut ip = [0u8; 16];
rng.fill_bytes(&mut ip);
let record = NodeRecord {
address: IpAddr::V6(ip.into()),
tcp_port: rng.gen(),
udp_port: rng.gen(),
id: NodeId::random(),
};
let mut buf = BytesMut::new();
record.encode(&mut buf);
let decoded = NodeRecord::decode(&mut buf.as_ref()).unwrap();
assert_eq!(record, decoded);
}
}
}

View File

@ -0,0 +1,578 @@
#![allow(missing_docs)]
use crate::{error::DecodePacketError, node::NodeRecord, NodeId, MAX_PACKET_SIZE, MIN_PACKET_SIZE};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use reth_primitives::{keccak256, H256};
use reth_rlp::{Decodable, DecodeError, Encodable, Header};
use reth_rlp_derive::{RlpDecodable, RlpEncodable};
use secp256k1::{
ecdsa::{RecoverableSignature, RecoveryId},
SecretKey, SECP256K1,
};
use std::net::{IpAddr, Ipv6Addr};
// Note: this is adapted from https://github.com/vorot93/discv4
/// Id for message variants.
#[derive(Debug)]
#[repr(u8)]
pub enum MessageId {
Ping = 1,
Pong = 2,
FindNode = 3,
Neighbours = 4,
}
impl MessageId {
/// Converts the byte that represents the message id to the enum.
fn from_u8(msg: u8) -> Result<Self, u8> {
let msg = match msg {
1 => MessageId::Ping,
2 => MessageId::Pong,
3 => MessageId::FindNode,
4 => MessageId::Neighbours,
_ => return Err(msg),
};
Ok(msg)
}
}
/// All message variants
#[derive(Debug, Eq, PartialEq)]
pub enum Message {
Ping(Ping),
Pong(Pong),
FindNode(FindNode),
Neighbours(Neighbours),
}
// === impl Message ===
impl Message {
/// Returns the id for this type
pub fn msg_type(&self) -> MessageId {
match self {
Message::Ping(_) => MessageId::Ping,
Message::Pong(_) => MessageId::Pong,
Message::FindNode(_) => MessageId::FindNode,
Message::Neighbours(_) => MessageId::Neighbours,
}
}
/// Encodes the UDP datagram, See <https://github.com/ethereum/devp2p/blob/master/discv4.md#wire-protocol>
///
/// The datagram is `header || payload`
/// where header is `hash || signature || packet-type`
pub fn encode(&self, secret_key: &SecretKey) -> (Bytes, H256) {
// allocate max packet size
let mut datagram = BytesMut::with_capacity(MAX_PACKET_SIZE);
// since signature has fixed len, we can split and fill the datagram buffer at fixed
// positions, this way we can encode the message directly in the datagram buffer
let mut sig_bytes = datagram.split_off(H256::len_bytes());
let mut payload = sig_bytes.split_off(secp256k1::constants::COMPACT_SIGNATURE_SIZE + 1);
match self {
Message::Ping(message) => {
payload.put_u8(1);
message.encode(&mut payload);
}
Message::Pong(message) => {
payload.put_u8(2);
message.encode(&mut payload);
}
Message::FindNode(message) => {
payload.put_u8(3);
message.encode(&mut payload);
}
Message::Neighbours(message) => {
payload.put_u8(4);
message.encode(&mut payload);
}
}
let signature: RecoverableSignature = SECP256K1.sign_ecdsa_recoverable(
&secp256k1::Message::from_slice(keccak256(&payload).as_ref())
.expect("is correct MESSAGE_SIZE; qed"),
secret_key,
);
let (rec, sig) = signature.serialize_compact();
sig_bytes.extend_from_slice(&sig);
sig_bytes.put_u8(rec.to_i32() as u8);
sig_bytes.unsplit(payload);
let hash = keccak256(&sig_bytes);
datagram.extend_from_slice(hash.as_bytes());
datagram.unsplit(sig_bytes);
(datagram.freeze(), hash)
}
/// Decodes the [`Message`] from the given buffer.
///
/// Returns the decoded message and the public key of the sender.
pub fn decode(packet: &[u8]) -> Result<Packet, DecodePacketError> {
if packet.len() < MIN_PACKET_SIZE {
return Err(DecodePacketError::PacketTooShort)
}
// parses the wire-protocol, every packet starts with a header:
// packet-header = hash || signature || packet-type
// hash = keccak256(signature || packet-type || packet-data)
// signature = sign(packet-type || packet-data)
let header_hash = keccak256(&packet[32..]);
let data_hash = H256::from_slice(&packet[..32]);
if data_hash != header_hash {
return Err(DecodePacketError::HashMismatch)
}
let signature = &packet[32..96];
let recovery_id = RecoveryId::from_i32(packet[96] as i32)?;
let recoverable_sig = RecoverableSignature::from_compact(signature, recovery_id)?;
// recover the public key
let msg = secp256k1::Message::from_slice(keccak256(&packet[97..]).as_bytes())?;
let pk = SECP256K1.recover_ecdsa(&msg, &recoverable_sig)?;
let node_id = NodeId::from_slice(&pk.serialize_uncompressed()[1..]);
let msg_type = packet[97];
let payload = &mut &packet[98..];
let msg = match MessageId::from_u8(msg_type).map_err(DecodePacketError::UnknownMessage)? {
MessageId::Ping => Message::Ping(Ping::decode(payload)?),
MessageId::Pong => Message::Pong(Pong::decode(payload)?),
MessageId::FindNode => Message::FindNode(FindNode::decode(payload)?),
MessageId::Neighbours => Message::Neighbours(Neighbours::decode(payload)?),
};
Ok(Packet { msg, node_id, hash: header_hash })
}
}
/// Decoded packet
#[derive(Debug)]
pub struct Packet {
pub msg: Message,
pub node_id: NodeId,
pub hash: H256,
}
/// Represents the `from`, `to` fields in the packets
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct NodeEndpoint {
pub address: IpAddr,
pub udp_port: u16,
pub tcp_port: u16,
}
impl Decodable for NodeEndpoint {
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError> {
let b = &mut &**buf;
let rlp_head = Header::decode(b)?;
if !rlp_head.list {
return Err(DecodeError::UnexpectedString)
}
let started_len = b.len();
let octets = Octets::decode(b)?;
let this = Self {
address: octets.into(),
udp_port: Decodable::decode(b)?,
tcp_port: Decodable::decode(b)?,
};
// the ENR record can contain additional entries that we skip
let consumed = started_len - b.len();
if consumed > rlp_head.payload_length {
return Err(DecodeError::ListLengthMismatch {
expected: rlp_head.payload_length,
got: consumed,
})
}
let rem = rlp_head.payload_length - consumed;
b.advance(rem);
*buf = *b;
Ok(this)
}
}
impl Encodable for NodeEndpoint {
fn encode(&self, out: &mut dyn BufMut) {
#[derive(RlpEncodable)]
struct RlpEndpoint {
octets: Octets,
udp_port: u16,
tcp_port: u16,
}
let octets = match self.address {
IpAddr::V4(addr) => Octets::V4(addr.octets()),
IpAddr::V6(addr) => Octets::V6(addr.octets()),
};
let p = RlpEndpoint { octets, udp_port: self.udp_port, tcp_port: self.tcp_port };
p.encode(out)
}
}
impl From<NodeRecord> for NodeEndpoint {
fn from(NodeRecord { address, tcp_port, udp_port, .. }: NodeRecord) -> Self {
Self { address, tcp_port, udp_port }
}
}
/// A [FindNode packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#findnode-packet-0x03).).
#[derive(Clone, Copy, Debug, Eq, PartialEq, RlpEncodable, RlpDecodable)]
pub struct FindNode {
pub id: NodeId,
pub expire: u64,
}
/// A [Neighbours packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#neighbors-packet-0x04).
#[derive(Clone, Debug, Eq, PartialEq, RlpEncodable, RlpDecodable)]
pub struct Neighbours {
pub nodes: Vec<NodeRecord>,
pub expire: u64,
}
/// A [Ping packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01).
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Ping {
pub from: NodeEndpoint,
pub to: NodeEndpoint,
pub expire: u64,
}
impl Encodable for Ping {
fn encode(&self, out: &mut dyn BufMut) {
#[derive(RlpEncodable)]
struct V4PingMessage<'a> {
version: u32,
from: &'a NodeEndpoint,
to: &'a NodeEndpoint,
expire: u64,
}
V4PingMessage {
version: 4, // version 4
from: &self.from,
to: &self.to,
expire: self.expire,
}
.encode(out)
}
}
impl Decodable for Ping {
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError> {
let b = &mut &**buf;
let rlp_head = Header::decode(b)?;
if !rlp_head.list {
return Err(DecodeError::UnexpectedString)
}
let started_len = b.len();
let _version = u32::decode(b)?;
let this = Self {
from: Decodable::decode(b)?,
to: Decodable::decode(b)?,
expire: Decodable::decode(b)?,
};
let consumed = started_len - b.len();
if consumed > rlp_head.payload_length {
return Err(DecodeError::ListLengthMismatch {
expected: rlp_head.payload_length,
got: consumed,
})
}
let rem = rlp_head.payload_length - consumed;
b.advance(rem);
*buf = *b;
Ok(this)
}
}
/// A [Pong packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#pong-packet-0x02).
// #[derive(Clone, Debug, Eq, PartialEq, RlpEncodable, RlpDecodable)]
#[derive(Clone, Debug, Eq, PartialEq, RlpEncodable)]
pub struct Pong {
pub to: NodeEndpoint,
pub echo: H256,
pub expire: u64,
}
impl Decodable for Pong {
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError> {
let b = &mut &**buf;
let rlp_head = Header::decode(b)?;
if !rlp_head.list {
return Err(DecodeError::UnexpectedString)
}
let started_len = b.len();
let this = Self {
to: Decodable::decode(b)?,
echo: Decodable::decode(b)?,
expire: Decodable::decode(b)?,
};
let consumed = started_len - b.len();
if consumed > rlp_head.payload_length {
return Err(DecodeError::ListLengthMismatch {
expected: rlp_head.payload_length,
got: consumed,
})
}
let rem = rlp_head.payload_length - consumed;
b.advance(rem);
*buf = *b;
Ok(this)
}
}
/// IpAddr octets
#[derive(Debug, Copy, Clone)]
pub(crate) enum Octets {
V4([u8; 4]),
V6([u8; 16]),
}
impl From<Octets> for IpAddr {
fn from(value: Octets) -> Self {
match value {
Octets::V4(o) => IpAddr::from(o),
Octets::V6(o) => {
let ipv6 = Ipv6Addr::from(o);
// If the ipv6 is ipv4 compatible/mapped, simply return the ipv4.
if let Some(ipv4) = ipv6.to_ipv4() {
IpAddr::V4(ipv4)
} else {
IpAddr::V6(ipv6)
}
}
}
}
}
impl Encodable for Octets {
fn encode(&self, out: &mut dyn BufMut) {
let octets = match self {
Octets::V4(ref o) => &o[..],
Octets::V6(ref o) => &o[..],
};
octets.encode(out)
}
}
impl Decodable for Octets {
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError> {
let h = Header::decode(buf)?;
if h.list {
return Err(DecodeError::UnexpectedList)
}
let o = match h.payload_length {
4 => {
let mut to = [0_u8; 4];
to.copy_from_slice(&buf[..4]);
Octets::V4(to)
}
16 => {
let mut to = [0u8; 16];
to.copy_from_slice(&buf[..16]);
Octets::V6(to)
}
_ => return Err(DecodeError::UnexpectedLength),
};
buf.advance(h.payload_length);
Ok(o)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS;
use bytes::BytesMut;
use rand::{thread_rng, Rng, RngCore};
fn rng_endpoint(rng: &mut impl Rng) -> NodeEndpoint {
let address = if rng.gen() {
let mut ip = [0u8; 4];
rng.fill_bytes(&mut ip);
IpAddr::V4(ip.into())
} else {
let mut ip = [0u8; 16];
rng.fill_bytes(&mut ip);
IpAddr::V6(ip.into())
};
NodeEndpoint { address, tcp_port: rng.gen(), udp_port: rng.gen() }
}
fn rng_record(rng: &mut impl RngCore) -> NodeRecord {
let NodeEndpoint { address, udp_port, tcp_port } = rng_endpoint(rng);
NodeRecord { address, tcp_port, udp_port, id: NodeId::random() }
}
fn rng_ipv6_record(rng: &mut impl RngCore) -> NodeRecord {
let mut ip = [0u8; 16];
rng.fill_bytes(&mut ip);
let address = IpAddr::V6(ip.into());
NodeRecord { address, tcp_port: rng.gen(), udp_port: rng.gen(), id: NodeId::random() }
}
fn rng_ipv4_record(rng: &mut impl RngCore) -> NodeRecord {
let mut ip = [0u8; 4];
rng.fill_bytes(&mut ip);
let address = IpAddr::V4(ip.into());
NodeRecord { address, tcp_port: rng.gen(), udp_port: rng.gen(), id: NodeId::random() }
}
fn rng_message(rng: &mut impl RngCore) -> Message {
match rng.gen_range(1..=4) {
1 => Message::Ping(Ping {
from: rng_endpoint(rng),
to: rng_endpoint(rng),
expire: rng.gen(),
}),
2 => Message::Pong(Pong {
to: rng_endpoint(rng),
echo: H256::random(),
expire: rng.gen(),
}),
3 => Message::FindNode(FindNode { id: NodeId::random(), expire: rng.gen() }),
4 => {
let num: usize = rng.gen_range(1..=SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS);
Message::Neighbours(Neighbours {
nodes: std::iter::repeat_with(|| rng_record(rng)).take(num).collect(),
expire: rng.gen(),
})
}
_ => unreachable!(),
}
}
#[test]
fn test_endpoint_ipv_v4() {
let mut rng = thread_rng();
for _ in 0..100 {
let mut ip = [0u8; 4];
rng.fill_bytes(&mut ip);
let msg = NodeEndpoint {
address: IpAddr::V4(ip.into()),
tcp_port: rng.gen(),
udp_port: rng.gen(),
};
let mut buf = BytesMut::new();
msg.encode(&mut buf);
let decoded = NodeEndpoint::decode(&mut buf.as_ref()).unwrap();
assert_eq!(msg, decoded);
}
}
#[test]
fn test_endpoint_ipv_64() {
let mut rng = thread_rng();
for _ in 0..100 {
let mut ip = [0u8; 16];
rng.fill_bytes(&mut ip);
let msg = NodeEndpoint {
address: IpAddr::V6(ip.into()),
tcp_port: rng.gen(),
udp_port: rng.gen(),
};
let mut buf = BytesMut::new();
msg.encode(&mut buf);
let decoded = NodeEndpoint::decode(&mut buf.as_ref()).unwrap();
assert_eq!(msg, decoded);
}
}
#[test]
fn test_ping_message() {
let mut rng = thread_rng();
for _ in 0..100 {
let mut ip = [0u8; 16];
rng.fill_bytes(&mut ip);
let msg = Ping { from: rng_endpoint(&mut rng), to: rng_endpoint(&mut rng), expire: 0 };
let mut buf = BytesMut::new();
msg.encode(&mut buf);
let decoded = Ping::decode(&mut buf.as_ref()).unwrap();
assert_eq!(msg, decoded);
}
}
#[test]
fn test_hash_mismatch() {
let mut rng = thread_rng();
let msg = rng_message(&mut rng);
let (secret_key, _) = SECP256K1.generate_keypair(&mut rng);
let (buf, _) = msg.encode(&secret_key);
let mut buf = BytesMut::from(buf.as_ref());
buf.put_u8(0);
match Message::decode(buf.as_ref()).unwrap_err() {
DecodePacketError::HashMismatch => {}
err => {
unreachable!("unexpected err {}", err)
}
}
}
#[test]
fn neighbours_max_nodes() {
let mut rng = thread_rng();
for _ in 0..1000 {
let msg = Message::Neighbours(Neighbours {
nodes: std::iter::repeat_with(|| rng_ipv6_record(&mut rng))
.take(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS)
.collect(),
expire: rng.gen(),
});
let (secret_key, _) = SECP256K1.generate_keypair(&mut rng);
let (encoded, _) = msg.encode(&secret_key);
assert!(encoded.len() <= MAX_PACKET_SIZE, "{} {:?}", encoded.len(), msg);
let mut neighbours = Neighbours {
nodes: std::iter::repeat_with(|| rng_ipv6_record(&mut rng))
.take(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS - 1)
.collect(),
expire: rng.gen(),
};
neighbours.nodes.push(rng_ipv4_record(&mut rng));
let msg = Message::Neighbours(neighbours);
let (encoded, _) = msg.encode(&secret_key);
assert!(encoded.len() <= MAX_PACKET_SIZE, "{} {:?}", encoded.len(), msg);
}
}
#[test]
fn test_encode_decode_message() {
let mut rng = thread_rng();
for _ in 0..100 {
let msg = rng_message(&mut rng);
let (secret_key, pk) = SECP256K1.generate_keypair(&mut rng);
let sender_id = NodeId::from_slice(&pk.serialize_uncompressed()[1..]);
let (buf, _) = msg.encode(&secret_key);
let packet = Message::decode(buf.as_ref()).unwrap();
assert_eq!(msg, packet.msg);
assert_eq!(sender_id, packet.node_id);
}
}
#[test]
fn decode_pong_packet() {
let packet = "2ad84c37327a06c2522cf7bc039621da89f68907441b755935bb308dc4cd17d6fe550e90329ad6a516ca7db18e08900067928a0dfa3b5c75d55a42c984497373698d98616662c048983ea85895ea2da765eabeb15525478384e106337bfd8ed50002f3c9843ed8cae682fd1c80a008ad4dead0922211df47593e7d837b2b23d13954285871ca23250ea594993ded84635690e5829670";
let data = hex::decode(packet).unwrap();
Message::decode(&data).unwrap();
}
#[test]
fn decode_ping_packet() {
let packet = "05ae5bf922cf2a93f97632a4ab0943dc252a0dab0c42d86dd62e5d91e1a0966e9b628fbf4763fdfbb928540460b797e6be2e7058a82f6083f6d2e7391bb021741459976d4152aa16bbee0c3609dcfac6668db1ef78b7ee9f8b4ced10dd5ae2900101df04cb8403d12d4f82765f82765fc9843ed8cae6828aa6808463569916829670";
let data = hex::decode(packet).unwrap();
Message::decode(&data).unwrap();
}
}

View File

@ -54,7 +54,7 @@ pub type StorageValue = H256;
// NOTE: There is a benefit of using wrapped Bytes as it gives us serde and debug // NOTE: There is a benefit of using wrapped Bytes as it gives us serde and debug
pub use ethers_core::{ pub use ethers_core::{
types as rpc, types as rpc,
types::{Bloom, Bytes, H128, H160, H256, H512, H64, U128, U256, U64}, types::{BigEndianHash, Bloom, Bytes, H128, H160, H256, H512, H64, U128, U256, U64},
}; };
#[doc(hidden)] #[doc(hidden)]