refactor(net): unify capability types (#168)

* refactor(net): unify capability types

* refactor(net): unify capability types
This commit is contained in:
Matthias Seitz
2022-11-07 18:04:17 +01:00
committed by GitHub
parent e870a4ce13
commit 5721262740
14 changed files with 142 additions and 153 deletions

2
Cargo.lock generated
View File

@ -3144,6 +3144,7 @@ dependencies = [
"reth-primitives", "reth-primitives",
"reth-rlp", "reth-rlp",
"secp256k1", "secp256k1",
"smol_str",
"snap", "snap",
"thiserror", "thiserror",
"tokio", "tokio",
@ -3260,7 +3261,6 @@ dependencies = [
"reth-rlp-derive", "reth-rlp-derive",
"reth-transaction-pool", "reth-transaction-pool",
"secp256k1", "secp256k1",
"smol_str",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream", "tokio-stream",

View File

@ -15,7 +15,7 @@ thiserror = "1"
# reth # reth
reth-ecies = { path = "../ecies" } reth-ecies = { path = "../ecies" }
reth-primitives = { path = "../../primitives" } reth-primitives = { path = "../../primitives" }
reth-rlp = { path = "../../common/rlp", features = ["alloc", "derive", "std", "ethereum-types"] } reth-rlp = { path = "../../common/rlp", features = ["alloc", "derive", "std", "ethereum-types", "smol_str"] }
#used for forkid #used for forkid
crc = "1" crc = "1"
@ -29,6 +29,7 @@ pin-project = "1.0"
pin-utils = "0.1.0" pin-utils = "0.1.0"
tracing = "0.1.37" tracing = "0.1.37"
snap = "1.0.5" snap = "1.0.5"
smol_str = { version = "0.1", default-features = false }
[dev-dependencies] [dev-dependencies]
hex-literal = "0.3" hex-literal = "0.3"

View File

@ -1,4 +1,107 @@
use crate::{version::ParseVersionError, EthVersion}; use crate::{version::ParseVersionError, EthMessage, EthVersion};
use bytes::{BufMut, Bytes};
use reth_rlp::{Decodable, DecodeError, Encodable, RlpDecodable, RlpEncodable};
use smol_str::SmolStr;
/// A Capability message consisting of the message-id and the payload
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RawCapabilityMessage {
/// Identifier of the message.
pub id: usize,
/// Actual payload
pub payload: Bytes,
}
/// Various protocol related event types bubbled up from a session that need to be handled by the
/// network.
#[derive(Debug)]
pub enum CapabilityMessage {
/// Eth sub-protocol message.
Eth(EthMessage),
/// Any other capability message.
Other(RawCapabilityMessage),
}
/// A message indicating a supported capability and capability version.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)]
pub struct Capability {
/// The name of the subprotocol
pub name: SmolStr,
/// The version of the subprotocol
pub version: usize,
}
impl Capability {
/// Create a new `Capability` with the given name and version.
pub fn new(name: SmolStr, version: usize) -> Self {
Self { name, version }
}
/// Whether this is eth v66 protocol.
#[inline]
pub fn is_eth_v66(&self) -> bool {
self.name == "eth" && self.version == 66
}
/// Whether this is eth v67.
#[inline]
pub fn is_eth_v67(&self) -> bool {
self.name == "eth" && self.version == 67
}
}
/// Represents all capabilities of a node.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Capabilities {
/// All Capabilities and their versions
inner: Vec<Capability>,
eth_66: bool,
eth_67: bool,
}
impl Capabilities {
/// Returns all capabilities.
#[inline]
pub fn capabilities(&self) -> &[Capability] {
&self.inner
}
/// Consumes the type and returns the all capabilities.
#[inline]
pub fn into_inner(self) -> Vec<Capability> {
self.inner
}
/// Whether this peer supports eth v66 protocol.
#[inline]
pub fn supports_eth_v66(&self) -> bool {
self.eth_66
}
/// Whether this peer supports eth v67 protocol.
#[inline]
pub fn supports_eth_v67(&self) -> bool {
self.eth_67
}
}
impl Encodable for Capabilities {
fn encode(&self, out: &mut dyn BufMut) {
self.inner.encode(out)
}
}
impl Decodable for Capabilities {
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError> {
let inner = Vec::<Capability>::decode(buf)?;
Ok(Self {
eth_66: inner.iter().any(Capability::is_eth_v66),
eth_67: inner.iter().any(Capability::is_eth_v67),
inner,
})
}
}
/// This represents a shared capability, its version, and its offset. /// This represents a shared capability, its version, and its offset.
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
@ -7,7 +110,7 @@ pub enum SharedCapability {
Eth { version: EthVersion, offset: u8 }, Eth { version: EthVersion, offset: u8 },
/// An unknown capability. /// An unknown capability.
UnknownCapability { name: String, version: u8, offset: u8 }, UnknownCapability { name: SmolStr, version: u8, offset: u8 },
} }
impl SharedCapability { impl SharedCapability {
@ -15,7 +118,7 @@ impl SharedCapability {
pub(crate) fn new(name: &str, version: u8, offset: u8) -> Result<Self, SharedCapabilityError> { pub(crate) fn new(name: &str, version: u8, offset: u8) -> Result<Self, SharedCapabilityError> {
match name { match name {
"eth" => Ok(Self::Eth { version: EthVersion::try_from(version)?, offset }), "eth" => Ok(Self::Eth { version: EthVersion::try_from(version)?, offset }),
_ => Ok(Self::UnknownCapability { name: name.to_string(), version, offset }), _ => Ok(Self::UnknownCapability { name: name.into(), version, offset }),
} }
} }

View File

@ -202,7 +202,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{ use crate::{
p2pstream::{CapabilityMessage, HelloMessage, ProtocolVersion, UnauthedP2PStream}, p2pstream::{HelloMessage, ProtocolVersion, UnauthedP2PStream},
types::{broadcast::BlockHashNumber, forkid::ForkFilter, EthMessage, Status}, types::{broadcast::BlockHashNumber, forkid::ForkFilter, EthMessage, Status},
EthStream, PassthroughCodec, EthStream, PassthroughCodec,
}; };
@ -212,7 +212,7 @@ mod tests {
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio_util::codec::Decoder; use tokio_util::codec::Decoder;
use crate::types::EthVersion; use crate::{capability::Capability, types::EthVersion};
use ethers_core::types::Chain; use ethers_core::types::Chain;
use reth_primitives::{H256, U256}; use reth_primitives::{H256, U256};
@ -378,10 +378,7 @@ mod tests {
let server_hello = HelloMessage { let server_hello = HelloMessage {
protocol_version: ProtocolVersion::V5, protocol_version: ProtocolVersion::V5,
client_version: "bitcoind/1.0.0".to_string(), client_version: "bitcoind/1.0.0".to_string(),
capabilities: vec![CapabilityMessage::new( capabilities: vec![Capability::new("eth".into(), EthVersion::Eth67 as usize)],
"eth".to_string(),
EthVersion::Eth67 as usize,
)],
port: 30303, port: 30303,
id: pk2id(&server_key.public_key(SECP256K1)), id: pk2id(&server_key.public_key(SECP256K1)),
}; };
@ -409,10 +406,7 @@ mod tests {
let client_hello = HelloMessage { let client_hello = HelloMessage {
protocol_version: ProtocolVersion::V5, protocol_version: ProtocolVersion::V5,
client_version: "bitcoind/1.0.0".to_string(), client_version: "bitcoind/1.0.0".to_string(),
capabilities: vec![CapabilityMessage::new( capabilities: vec![Capability::new("eth".into(), EthVersion::Eth67 as usize)],
"eth".to_string(),
EthVersion::Eth67 as usize,
)],
port: 30303, port: 30303,
id: pk2id(&client_key.public_key(SECP256K1)), id: pk2id(&client_key.public_key(SECP256K1)),
}; };

View File

@ -11,7 +11,7 @@
pub use tokio_util::codec::{ pub use tokio_util::codec::{
LengthDelimitedCodec as PassthroughCodec, LengthDelimitedCodecError as PassthroughCodecError, LengthDelimitedCodec as PassthroughCodec, LengthDelimitedCodecError as PassthroughCodecError,
}; };
mod capability; pub mod capability;
pub mod error; pub mod error;
mod ethstream; mod ethstream;
mod p2pstream; mod p2pstream;

View File

@ -1,6 +1,6 @@
#![allow(dead_code, unreachable_pub, missing_docs, unused_variables)] #![allow(dead_code, unreachable_pub, missing_docs, unused_variables)]
use crate::{ use crate::{
capability::SharedCapability, capability::{Capability, SharedCapability},
error::{P2PHandshakeError, P2PStreamError}, error::{P2PHandshakeError, P2PStreamError},
pinger::{Pinger, PingerEvent}, pinger::{Pinger, PingerEvent},
}; };
@ -313,8 +313,8 @@ where
/// ///
/// Currently only `eth` versions 66 and 67 are supported. /// Currently only `eth` versions 66 and 67 are supported.
pub fn set_capability_offsets( pub fn set_capability_offsets(
local_capabilities: Vec<CapabilityMessage>, local_capabilities: Vec<Capability>,
peer_capabilities: Vec<CapabilityMessage>, peer_capabilities: Vec<Capability>,
) -> Result<SharedCapability, P2PStreamError> { ) -> Result<SharedCapability, P2PStreamError> {
// find intersection of capabilities // find intersection of capabilities
let our_capabilities_map = let our_capabilities_map =
@ -513,22 +513,6 @@ impl TryFrom<u8> for P2PMessageID {
} }
} }
/// A message indicating a supported capability and capability version.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)]
pub struct CapabilityMessage {
/// The name of the subprotocol
pub name: String,
/// The version of the subprotocol
pub version: usize,
}
impl CapabilityMessage {
/// Create a new `CapabilityMessage` with the given name and version.
pub fn new(name: String, version: usize) -> Self {
Self { name, version }
}
}
// TODO: determine if we should allow for the extra fields at the end like EIP-706 suggests // TODO: determine if we should allow for the extra fields at the end like EIP-706 suggests
/// Message used in the `p2p` handshake, containing information about the supported RLPx protocol /// Message used in the `p2p` handshake, containing information about the supported RLPx protocol
/// version and capabilities. /// version and capabilities.
@ -540,7 +524,7 @@ pub struct HelloMessage {
/// "Ethereum(++)/1.0.0"). /// "Ethereum(++)/1.0.0").
pub client_version: String, pub client_version: String,
/// The list of supported capabilities and their versions. /// The list of supported capabilities and their versions.
pub capabilities: Vec<CapabilityMessage>, pub capabilities: Vec<Capability>,
/// The port that the client is listening on, zero indicates the client is not listening. /// The port that the client is listening on, zero indicates the client is not listening.
pub port: u16, pub port: u16,
/// The secp256k1 public key corresponding to the node's private key. /// The secp256k1 public key corresponding to the node's private key.
@ -860,10 +844,7 @@ mod tests {
let hello = P2PMessage::Hello(HelloMessage { let hello = P2PMessage::Hello(HelloMessage {
protocol_version: ProtocolVersion::V5, protocol_version: ProtocolVersion::V5,
client_version: "reth/0.1.0".to_string(), client_version: "reth/0.1.0".to_string(),
capabilities: vec![CapabilityMessage::new( capabilities: vec![Capability::new("eth".into(), EthVersion::Eth67 as usize)],
"eth".to_string(),
EthVersion::Eth67 as usize,
)],
port: 30303, port: 30303,
id, id,
}); });
@ -883,10 +864,7 @@ mod tests {
let hello = P2PMessage::Hello(HelloMessage { let hello = P2PMessage::Hello(HelloMessage {
protocol_version: ProtocolVersion::V5, protocol_version: ProtocolVersion::V5,
client_version: "reth/0.1.0".to_string(), client_version: "reth/0.1.0".to_string(),
capabilities: vec![CapabilityMessage::new( capabilities: vec![Capability::new("eth".into(), EthVersion::Eth67 as usize)],
"eth".to_string(),
EthVersion::Eth67 as usize,
)],
port: 30303, port: 30303,
id, id,
}); });
@ -905,10 +883,7 @@ mod tests {
let hello = P2PMessage::Hello(HelloMessage { let hello = P2PMessage::Hello(HelloMessage {
protocol_version: ProtocolVersion::V5, protocol_version: ProtocolVersion::V5,
client_version: "reth/0.1.0".to_string(), client_version: "reth/0.1.0".to_string(),
capabilities: vec![CapabilityMessage::new( capabilities: vec![Capability::new("eth".into(), EthVersion::Eth67 as usize)],
"eth".to_string(),
EthVersion::Eth67 as usize,
)],
port: 30303, port: 30303,
id, id,
}); });

View File

@ -1,7 +1,7 @@
use std::str::FromStr; use std::str::FromStr;
use thiserror::Error; use thiserror::Error;
use crate::p2pstream::CapabilityMessage; use crate::capability::Capability;
#[derive(Debug, Clone, PartialEq, Eq, Error)] #[derive(Debug, Clone, PartialEq, Eq, Error)]
#[error("Unknown eth protocol version: {0}")] #[error("Unknown eth protocol version: {0}")]
@ -101,10 +101,10 @@ impl From<EthVersion> for &'static str {
} }
} }
impl From<EthVersion> for CapabilityMessage { impl From<EthVersion> for Capability {
#[inline] #[inline]
fn from(v: EthVersion) -> CapabilityMessage { fn from(v: EthVersion) -> Capability {
CapabilityMessage { name: String::from("eth"), version: v as usize } Capability { name: "eth".into(), version: v as usize }
} }
} }

View File

@ -16,7 +16,7 @@ reth-primitives = { path = "../../primitives" }
reth-discv4 = { path = "../discv4" } reth-discv4 = { path = "../discv4" }
reth-eth-wire = { path = "../eth-wire" } reth-eth-wire = { path = "../eth-wire" }
reth-ecies = { path = "../ecies" } reth-ecies = { path = "../ecies" }
reth-rlp = { path = "../../common/rlp", features = ["smol_str"] } reth-rlp = { path = "../../common/rlp" }
reth-rlp-derive = { path = "../../common/rlp-derive" } reth-rlp-derive = { path = "../../common/rlp-derive" }
reth-transaction-pool = { path = "../../transaction-pool" } reth-transaction-pool = { path = "../../transaction-pool" }
@ -34,7 +34,6 @@ thiserror = "1.0"
parking_lot = "0.12" parking_lot = "0.12"
async-trait = "0.1" async-trait = "0.1"
bytes = "1.2" bytes = "1.2"
smol_str = { version = "0.1", default-features = false }
either = "1.8" either = "1.8"
secp256k1 = { version = "0.24", features = [ secp256k1 = { version = "0.24", features = [

View File

@ -20,7 +20,6 @@ use crate::{
discovery::Discovery, discovery::Discovery,
error::NetworkError, error::NetworkError,
listener::ConnectionListener, listener::ConnectionListener,
message::{Capabilities, CapabilityMessage},
network::{NetworkHandle, NetworkHandleMessage}, network::{NetworkHandle, NetworkHandleMessage},
peers::PeersManager, peers::PeersManager,
session::SessionManager, session::SessionManager,
@ -30,7 +29,10 @@ use crate::{
}; };
use futures::{Future, StreamExt}; use futures::{Future, StreamExt};
use parking_lot::Mutex; use parking_lot::Mutex;
use reth_eth_wire::EthMessage; use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
EthMessage,
};
use reth_interfaces::provider::BlockProvider; use reth_interfaces::provider::BlockProvider;
use std::{ use std::{
net::SocketAddr, net::SocketAddr,
@ -122,7 +124,6 @@ where
// need to retrieve the addr here since provided port could be `0` // need to retrieve the addr here since provided port could be `0`
let local_node_id = discovery.local_id(); let local_node_id = discovery.local_id();
// TODO this should also need sk for encrypted sessions
let sessions = SessionManager::new(secret_key, sessions_config); let sessions = SessionManager::new(secret_key, sessions_config);
let state = NetworkState::new(client, discovery, peers_manger); let state = NetworkState::new(client, discovery, peers_manger);
@ -161,10 +162,11 @@ where
/// Event hook for an unexpected message from the peer. /// Event hook for an unexpected message from the peer.
fn on_invalid_message( fn on_invalid_message(
&self, &self,
_node_id: NodeId, node_id: NodeId,
_capabilities: Arc<Capabilities>, _capabilities: Arc<Capabilities>,
_message: CapabilityMessage, _message: CapabilityMessage,
) { ) {
trace!(?node_id, target = "net", "received unexpected message");
// TODO: disconnect? // TODO: disconnect?
} }

View File

@ -3,11 +3,8 @@
//! An RLPx stream is multiplexed via the prepended message-id of a framed message. //! An RLPx stream is multiplexed via the prepended message-id of a framed message.
//! Capabilities are exchanged via the RLPx `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging> //! Capabilities are exchanged via the RLPx `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
use bytes::{BufMut, Bytes}; use reth_eth_wire::{BlockHeaders, GetBlockHeaders};
use reth_eth_wire::{BlockHeaders, EthMessage, GetBlockHeaders};
use reth_rlp::{Decodable, DecodeError, Encodable};
use reth_rlp_derive::{RlpDecodable, RlpEncodable};
use smol_str::SmolStr;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
/// Result alias for result of a request. /// Result alias for result of a request.
@ -39,91 +36,6 @@ impl From<oneshot::error::RecvError> for RequestError {
} }
} }
/// Represents all capabilities of a node.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Capabilities {
/// All Capabilities and their versions
inner: Vec<Capability>,
eth_66: bool,
eth_67: bool,
}
impl Capabilities {
/// Whether this peer supports eth v66 protocol.
#[inline]
pub fn supports_eth_v66(&self) -> bool {
self.eth_66
}
/// Whether this peer supports eth v67 protocol.
#[inline]
pub fn supports_eth_v67(&self) -> bool {
self.eth_67
}
}
impl Encodable for Capabilities {
fn encode(&self, out: &mut dyn BufMut) {
self.inner.encode(out)
}
}
impl Decodable for Capabilities {
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError> {
let inner = Vec::<Capability>::decode(buf)?;
Ok(Self {
eth_66: inner.iter().any(Capability::is_eth_v66),
eth_67: inner.iter().any(Capability::is_eth_v67),
inner,
})
}
}
/// Represents an announced Capability in the `Hello` message
#[derive(Debug, Clone, Eq, PartialEq, RlpDecodable, RlpEncodable)]
pub struct Capability {
/// Name of the Capability
pub name: SmolStr,
/// The version of the capability
pub version: u64,
}
// === impl Capability ===
impl Capability {
/// Whether this is eth v66 protocol.
#[inline]
pub fn is_eth_v66(&self) -> bool {
self.name == "eth" && self.version == 66
}
/// Whether this is eth v67.
#[inline]
pub fn is_eth_v67(&self) -> bool {
self.name == "eth" && self.version == 67
}
}
/// A Capability message consisting of the message-id and the payload
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RawCapabilityMessage {
/// Identifier of the message.
pub id: usize,
/// Actual payload
pub payload: Bytes,
}
/// Various protocol related event types bubbled up from a session that need to be handled by the
/// network.
#[derive(Debug)]
pub enum CapabilityMessage {
/// Eth sub-protocol message.
Eth(EthMessage),
/// Any other capability message.
Other(RawCapabilityMessage),
}
/// Protocol related request messages that expect a response /// Protocol related request messages that expect a response
#[derive(Debug)] #[derive(Debug)]
pub enum CapabilityRequest { pub enum CapabilityRequest {

View File

@ -1,10 +1,10 @@
//! Session handles //! Session handles
use crate::{ use crate::{
message::{Capabilities, CapabilityMessage},
session::{Direction, SessionId}, session::{Direction, SessionId},
NodeId, NodeId,
}; };
use reth_ecies::{stream::ECIESStream, ECIESError}; use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::capability::{Capabilities, CapabilityMessage};
use std::{io, net::SocketAddr, sync::Arc, time::Instant}; use std::{io, net::SocketAddr, sync::Arc, time::Instant};
use tokio::{ use tokio::{
net::TcpStream, net::TcpStream,

View File

@ -1,6 +1,5 @@
//! Support for handling peer sessions. //! Support for handling peer sessions.
use crate::{ use crate::{
message::{Capabilities, CapabilityMessage},
session::handle::{ session::handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
}, },
@ -10,7 +9,10 @@ use fnv::FnvHashMap;
use futures::{future::Either, io, FutureExt, StreamExt}; use futures::{future::Either, io, FutureExt, StreamExt};
pub use handle::PeerMessageSender; pub use handle::PeerMessageSender;
use reth_ecies::{stream::ECIESStream, ECIESError}; use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::UnauthedEthStream; use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
UnauthedEthStream,
};
use secp256k1::{SecretKey, SECP256K1}; use secp256k1::{SecretKey, SECP256K1};
use std::{ use std::{
collections::HashMap, collections::HashMap,

View File

@ -3,13 +3,14 @@
use crate::{ use crate::{
discovery::{Discovery, DiscoveryEvent}, discovery::{Discovery, DiscoveryEvent},
fetch::StateFetcher, fetch::StateFetcher,
message::{Capabilities, CapabilityResponse}, message::CapabilityResponse,
peers::{PeerAction, PeersManager}, peers::{PeerAction, PeersManager},
session::PeerMessageSender, session::PeerMessageSender,
NodeId, NodeId,
}; };
use futures::FutureExt; use futures::FutureExt;
use reth_eth_wire::capability::Capabilities;
use reth_interfaces::provider::BlockProvider; use reth_interfaces::provider::BlockProvider;
use reth_primitives::{H256, U256}; use reth_primitives::{H256, U256};
use std::{ use std::{

View File

@ -1,12 +1,12 @@
use crate::{ use crate::{
listener::{ConnectionListener, ListenerEvent}, listener::{ConnectionListener, ListenerEvent},
message::{Capabilities, CapabilityMessage},
session::{SessionEvent, SessionId, SessionManager}, session::{SessionEvent, SessionId, SessionManager},
state::{NetworkState, StateAction}, state::{NetworkState, StateAction},
NodeId, NodeId,
}; };
use futures::Stream; use futures::Stream;
use reth_ecies::ECIESError; use reth_ecies::ECIESError;
use reth_eth_wire::capability::{Capabilities, CapabilityMessage};
use reth_interfaces::provider::BlockProvider; use reth_interfaces::provider::BlockProvider;
use std::{ use std::{
io, io,