From 5721262740f66f4461036499172d880355c6838a Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 7 Nov 2022 18:04:17 +0100 Subject: [PATCH] refactor(net): unify capability types (#168) * refactor(net): unify capability types * refactor(net): unify capability types --- Cargo.lock | 2 +- crates/net/eth-wire/Cargo.toml | 3 +- crates/net/eth-wire/src/capability.rs | 109 ++++++++++++++++++++++- crates/net/eth-wire/src/ethstream.rs | 14 +-- crates/net/eth-wire/src/lib.rs | 2 +- crates/net/eth-wire/src/p2pstream.rs | 39 ++------ crates/net/eth-wire/src/types/version.rs | 8 +- crates/net/network/Cargo.toml | 3 +- crates/net/network/src/manager.rs | 10 ++- crates/net/network/src/message.rs | 92 +------------------ crates/net/network/src/session/handle.rs | 2 +- crates/net/network/src/session/mod.rs | 6 +- crates/net/network/src/state.rs | 3 +- crates/net/network/src/swarm.rs | 2 +- 14 files changed, 142 insertions(+), 153 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a3b2bbacf..4822e4397 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3144,6 +3144,7 @@ dependencies = [ "reth-primitives", "reth-rlp", "secp256k1", + "smol_str", "snap", "thiserror", "tokio", @@ -3260,7 +3261,6 @@ dependencies = [ "reth-rlp-derive", "reth-transaction-pool", "secp256k1", - "smol_str", "thiserror", "tokio", "tokio-stream", diff --git a/crates/net/eth-wire/Cargo.toml b/crates/net/eth-wire/Cargo.toml index 3882176a3..e754bec02 100644 --- a/crates/net/eth-wire/Cargo.toml +++ b/crates/net/eth-wire/Cargo.toml @@ -15,7 +15,7 @@ thiserror = "1" # reth reth-ecies = { path = "../ecies" } reth-primitives = { path = "../../primitives" } -reth-rlp = { path = "../../common/rlp", features = ["alloc", "derive", "std", "ethereum-types"] } +reth-rlp = { path = "../../common/rlp", features = ["alloc", "derive", "std", "ethereum-types", "smol_str"] } #used for forkid crc = "1" @@ -29,6 +29,7 @@ pin-project = "1.0" pin-utils = "0.1.0" tracing = "0.1.37" snap = "1.0.5" +smol_str = { version = "0.1", default-features = false } [dev-dependencies] hex-literal = "0.3" diff --git a/crates/net/eth-wire/src/capability.rs b/crates/net/eth-wire/src/capability.rs index f67a384f9..ddede16b3 100644 --- a/crates/net/eth-wire/src/capability.rs +++ b/crates/net/eth-wire/src/capability.rs @@ -1,4 +1,107 @@ -use crate::{version::ParseVersionError, EthVersion}; +use crate::{version::ParseVersionError, EthMessage, EthVersion}; +use bytes::{BufMut, Bytes}; +use reth_rlp::{Decodable, DecodeError, Encodable, RlpDecodable, RlpEncodable}; +use smol_str::SmolStr; + +/// A Capability message consisting of the message-id and the payload +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct RawCapabilityMessage { + /// Identifier of the message. + pub id: usize, + /// Actual payload + pub payload: Bytes, +} + +/// Various protocol related event types bubbled up from a session that need to be handled by the +/// network. +#[derive(Debug)] +pub enum CapabilityMessage { + /// Eth sub-protocol message. + Eth(EthMessage), + /// Any other capability message. + Other(RawCapabilityMessage), +} + +/// A message indicating a supported capability and capability version. +#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)] +pub struct Capability { + /// The name of the subprotocol + pub name: SmolStr, + /// The version of the subprotocol + pub version: usize, +} + +impl Capability { + /// Create a new `Capability` with the given name and version. + pub fn new(name: SmolStr, version: usize) -> Self { + Self { name, version } + } + + /// Whether this is eth v66 protocol. + #[inline] + pub fn is_eth_v66(&self) -> bool { + self.name == "eth" && self.version == 66 + } + + /// Whether this is eth v67. + #[inline] + pub fn is_eth_v67(&self) -> bool { + self.name == "eth" && self.version == 67 + } +} + +/// Represents all capabilities of a node. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct Capabilities { + /// All Capabilities and their versions + inner: Vec, + eth_66: bool, + eth_67: bool, +} + +impl Capabilities { + /// Returns all capabilities. + #[inline] + pub fn capabilities(&self) -> &[Capability] { + &self.inner + } + + /// Consumes the type and returns the all capabilities. + #[inline] + pub fn into_inner(self) -> Vec { + self.inner + } + + /// Whether this peer supports eth v66 protocol. + #[inline] + pub fn supports_eth_v66(&self) -> bool { + self.eth_66 + } + + /// Whether this peer supports eth v67 protocol. + #[inline] + pub fn supports_eth_v67(&self) -> bool { + self.eth_67 + } +} + +impl Encodable for Capabilities { + fn encode(&self, out: &mut dyn BufMut) { + self.inner.encode(out) + } +} + +impl Decodable for Capabilities { + fn decode(buf: &mut &[u8]) -> Result { + let inner = Vec::::decode(buf)?; + + Ok(Self { + eth_66: inner.iter().any(Capability::is_eth_v66), + eth_67: inner.iter().any(Capability::is_eth_v67), + inner, + }) + } +} /// This represents a shared capability, its version, and its offset. #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -7,7 +110,7 @@ pub enum SharedCapability { Eth { version: EthVersion, offset: u8 }, /// An unknown capability. - UnknownCapability { name: String, version: u8, offset: u8 }, + UnknownCapability { name: SmolStr, version: u8, offset: u8 }, } impl SharedCapability { @@ -15,7 +118,7 @@ impl SharedCapability { pub(crate) fn new(name: &str, version: u8, offset: u8) -> Result { match name { "eth" => Ok(Self::Eth { version: EthVersion::try_from(version)?, offset }), - _ => Ok(Self::UnknownCapability { name: name.to_string(), version, offset }), + _ => Ok(Self::UnknownCapability { name: name.into(), version, offset }), } } diff --git a/crates/net/eth-wire/src/ethstream.rs b/crates/net/eth-wire/src/ethstream.rs index 29b4e9497..e7fbd205e 100644 --- a/crates/net/eth-wire/src/ethstream.rs +++ b/crates/net/eth-wire/src/ethstream.rs @@ -202,7 +202,7 @@ where #[cfg(test)] mod tests { use crate::{ - p2pstream::{CapabilityMessage, HelloMessage, ProtocolVersion, UnauthedP2PStream}, + p2pstream::{HelloMessage, ProtocolVersion, UnauthedP2PStream}, types::{broadcast::BlockHashNumber, forkid::ForkFilter, EthMessage, Status}, EthStream, PassthroughCodec, }; @@ -212,7 +212,7 @@ mod tests { use tokio::net::{TcpListener, TcpStream}; use tokio_util::codec::Decoder; - use crate::types::EthVersion; + use crate::{capability::Capability, types::EthVersion}; use ethers_core::types::Chain; use reth_primitives::{H256, U256}; @@ -378,10 +378,7 @@ mod tests { let server_hello = HelloMessage { protocol_version: ProtocolVersion::V5, client_version: "bitcoind/1.0.0".to_string(), - capabilities: vec![CapabilityMessage::new( - "eth".to_string(), - EthVersion::Eth67 as usize, - )], + capabilities: vec![Capability::new("eth".into(), EthVersion::Eth67 as usize)], port: 30303, id: pk2id(&server_key.public_key(SECP256K1)), }; @@ -409,10 +406,7 @@ mod tests { let client_hello = HelloMessage { protocol_version: ProtocolVersion::V5, client_version: "bitcoind/1.0.0".to_string(), - capabilities: vec![CapabilityMessage::new( - "eth".to_string(), - EthVersion::Eth67 as usize, - )], + capabilities: vec![Capability::new("eth".into(), EthVersion::Eth67 as usize)], port: 30303, id: pk2id(&client_key.public_key(SECP256K1)), }; diff --git a/crates/net/eth-wire/src/lib.rs b/crates/net/eth-wire/src/lib.rs index 583a0913b..2404aa9ea 100644 --- a/crates/net/eth-wire/src/lib.rs +++ b/crates/net/eth-wire/src/lib.rs @@ -11,7 +11,7 @@ pub use tokio_util::codec::{ LengthDelimitedCodec as PassthroughCodec, LengthDelimitedCodecError as PassthroughCodecError, }; -mod capability; +pub mod capability; pub mod error; mod ethstream; mod p2pstream; diff --git a/crates/net/eth-wire/src/p2pstream.rs b/crates/net/eth-wire/src/p2pstream.rs index a35fa2d79..27b148722 100644 --- a/crates/net/eth-wire/src/p2pstream.rs +++ b/crates/net/eth-wire/src/p2pstream.rs @@ -1,6 +1,6 @@ #![allow(dead_code, unreachable_pub, missing_docs, unused_variables)] use crate::{ - capability::SharedCapability, + capability::{Capability, SharedCapability}, error::{P2PHandshakeError, P2PStreamError}, pinger::{Pinger, PingerEvent}, }; @@ -313,8 +313,8 @@ where /// /// Currently only `eth` versions 66 and 67 are supported. pub fn set_capability_offsets( - local_capabilities: Vec, - peer_capabilities: Vec, + local_capabilities: Vec, + peer_capabilities: Vec, ) -> Result { // find intersection of capabilities let our_capabilities_map = @@ -513,22 +513,6 @@ impl TryFrom for P2PMessageID { } } -/// A message indicating a supported capability and capability version. -#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)] -pub struct CapabilityMessage { - /// The name of the subprotocol - pub name: String, - /// The version of the subprotocol - pub version: usize, -} - -impl CapabilityMessage { - /// Create a new `CapabilityMessage` with the given name and version. - pub fn new(name: String, version: usize) -> Self { - Self { name, version } - } -} - // TODO: determine if we should allow for the extra fields at the end like EIP-706 suggests /// Message used in the `p2p` handshake, containing information about the supported RLPx protocol /// version and capabilities. @@ -540,7 +524,7 @@ pub struct HelloMessage { /// "Ethereum(++)/1.0.0"). pub client_version: String, /// The list of supported capabilities and their versions. - pub capabilities: Vec, + pub capabilities: Vec, /// The port that the client is listening on, zero indicates the client is not listening. pub port: u16, /// The secp256k1 public key corresponding to the node's private key. @@ -860,10 +844,7 @@ mod tests { let hello = P2PMessage::Hello(HelloMessage { protocol_version: ProtocolVersion::V5, client_version: "reth/0.1.0".to_string(), - capabilities: vec![CapabilityMessage::new( - "eth".to_string(), - EthVersion::Eth67 as usize, - )], + capabilities: vec![Capability::new("eth".into(), EthVersion::Eth67 as usize)], port: 30303, id, }); @@ -883,10 +864,7 @@ mod tests { let hello = P2PMessage::Hello(HelloMessage { protocol_version: ProtocolVersion::V5, client_version: "reth/0.1.0".to_string(), - capabilities: vec![CapabilityMessage::new( - "eth".to_string(), - EthVersion::Eth67 as usize, - )], + capabilities: vec![Capability::new("eth".into(), EthVersion::Eth67 as usize)], port: 30303, id, }); @@ -905,10 +883,7 @@ mod tests { let hello = P2PMessage::Hello(HelloMessage { protocol_version: ProtocolVersion::V5, client_version: "reth/0.1.0".to_string(), - capabilities: vec![CapabilityMessage::new( - "eth".to_string(), - EthVersion::Eth67 as usize, - )], + capabilities: vec![Capability::new("eth".into(), EthVersion::Eth67 as usize)], port: 30303, id, }); diff --git a/crates/net/eth-wire/src/types/version.rs b/crates/net/eth-wire/src/types/version.rs index ec9930753..4735e0cd8 100644 --- a/crates/net/eth-wire/src/types/version.rs +++ b/crates/net/eth-wire/src/types/version.rs @@ -1,7 +1,7 @@ use std::str::FromStr; use thiserror::Error; -use crate::p2pstream::CapabilityMessage; +use crate::capability::Capability; #[derive(Debug, Clone, PartialEq, Eq, Error)] #[error("Unknown eth protocol version: {0}")] @@ -101,10 +101,10 @@ impl From for &'static str { } } -impl From for CapabilityMessage { +impl From for Capability { #[inline] - fn from(v: EthVersion) -> CapabilityMessage { - CapabilityMessage { name: String::from("eth"), version: v as usize } + fn from(v: EthVersion) -> Capability { + Capability { name: "eth".into(), version: v as usize } } } diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 35f07d60a..dfe555aea 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -16,7 +16,7 @@ reth-primitives = { path = "../../primitives" } reth-discv4 = { path = "../discv4" } reth-eth-wire = { path = "../eth-wire" } reth-ecies = { path = "../ecies" } -reth-rlp = { path = "../../common/rlp", features = ["smol_str"] } +reth-rlp = { path = "../../common/rlp" } reth-rlp-derive = { path = "../../common/rlp-derive" } reth-transaction-pool = { path = "../../transaction-pool" } @@ -34,7 +34,6 @@ thiserror = "1.0" parking_lot = "0.12" async-trait = "0.1" bytes = "1.2" -smol_str = { version = "0.1", default-features = false } either = "1.8" secp256k1 = { version = "0.24", features = [ diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 22e618842..f03dc5d89 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -20,7 +20,6 @@ use crate::{ discovery::Discovery, error::NetworkError, listener::ConnectionListener, - message::{Capabilities, CapabilityMessage}, network::{NetworkHandle, NetworkHandleMessage}, peers::PeersManager, session::SessionManager, @@ -30,7 +29,10 @@ use crate::{ }; use futures::{Future, StreamExt}; use parking_lot::Mutex; -use reth_eth_wire::EthMessage; +use reth_eth_wire::{ + capability::{Capabilities, CapabilityMessage}, + EthMessage, +}; use reth_interfaces::provider::BlockProvider; use std::{ net::SocketAddr, @@ -122,7 +124,6 @@ where // need to retrieve the addr here since provided port could be `0` let local_node_id = discovery.local_id(); - // TODO this should also need sk for encrypted sessions let sessions = SessionManager::new(secret_key, sessions_config); let state = NetworkState::new(client, discovery, peers_manger); @@ -161,10 +162,11 @@ where /// Event hook for an unexpected message from the peer. fn on_invalid_message( &self, - _node_id: NodeId, + node_id: NodeId, _capabilities: Arc, _message: CapabilityMessage, ) { + trace!(?node_id, target = "net", "received unexpected message"); // TODO: disconnect? } diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index d98db200f..775391232 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -3,11 +3,8 @@ //! An RLPx stream is multiplexed via the prepended message-id of a framed message. //! Capabilities are exchanged via the RLPx `Hello` message as pairs of `(id, version)`, -use bytes::{BufMut, Bytes}; -use reth_eth_wire::{BlockHeaders, EthMessage, GetBlockHeaders}; -use reth_rlp::{Decodable, DecodeError, Encodable}; -use reth_rlp_derive::{RlpDecodable, RlpEncodable}; -use smol_str::SmolStr; +use reth_eth_wire::{BlockHeaders, GetBlockHeaders}; + use tokio::sync::{mpsc, oneshot}; /// Result alias for result of a request. @@ -39,91 +36,6 @@ impl From for RequestError { } } -/// Represents all capabilities of a node. -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct Capabilities { - /// All Capabilities and their versions - inner: Vec, - eth_66: bool, - eth_67: bool, -} - -impl Capabilities { - /// Whether this peer supports eth v66 protocol. - #[inline] - pub fn supports_eth_v66(&self) -> bool { - self.eth_66 - } - - /// Whether this peer supports eth v67 protocol. - #[inline] - pub fn supports_eth_v67(&self) -> bool { - self.eth_67 - } -} - -impl Encodable for Capabilities { - fn encode(&self, out: &mut dyn BufMut) { - self.inner.encode(out) - } -} - -impl Decodable for Capabilities { - fn decode(buf: &mut &[u8]) -> Result { - let inner = Vec::::decode(buf)?; - - Ok(Self { - eth_66: inner.iter().any(Capability::is_eth_v66), - eth_67: inner.iter().any(Capability::is_eth_v67), - inner, - }) - } -} - -/// Represents an announced Capability in the `Hello` message -#[derive(Debug, Clone, Eq, PartialEq, RlpDecodable, RlpEncodable)] -pub struct Capability { - /// Name of the Capability - pub name: SmolStr, - /// The version of the capability - pub version: u64, -} - -// === impl Capability === - -impl Capability { - /// Whether this is eth v66 protocol. - #[inline] - pub fn is_eth_v66(&self) -> bool { - self.name == "eth" && self.version == 66 - } - - /// Whether this is eth v67. - #[inline] - pub fn is_eth_v67(&self) -> bool { - self.name == "eth" && self.version == 67 - } -} - -/// A Capability message consisting of the message-id and the payload -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct RawCapabilityMessage { - /// Identifier of the message. - pub id: usize, - /// Actual payload - pub payload: Bytes, -} - -/// Various protocol related event types bubbled up from a session that need to be handled by the -/// network. -#[derive(Debug)] -pub enum CapabilityMessage { - /// Eth sub-protocol message. - Eth(EthMessage), - /// Any other capability message. - Other(RawCapabilityMessage), -} - /// Protocol related request messages that expect a response #[derive(Debug)] pub enum CapabilityRequest { diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs index e0509dfe7..01b227a83 100644 --- a/crates/net/network/src/session/handle.rs +++ b/crates/net/network/src/session/handle.rs @@ -1,10 +1,10 @@ //! Session handles use crate::{ - message::{Capabilities, CapabilityMessage}, session::{Direction, SessionId}, NodeId, }; use reth_ecies::{stream::ECIESStream, ECIESError}; +use reth_eth_wire::capability::{Capabilities, CapabilityMessage}; use std::{io, net::SocketAddr, sync::Arc, time::Instant}; use tokio::{ net::TcpStream, diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index d8ccd83ef..9a4bb9960 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -1,6 +1,5 @@ //! Support for handling peer sessions. use crate::{ - message::{Capabilities, CapabilityMessage}, session::handle::{ ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, }, @@ -10,7 +9,10 @@ use fnv::FnvHashMap; use futures::{future::Either, io, FutureExt, StreamExt}; pub use handle::PeerMessageSender; use reth_ecies::{stream::ECIESStream, ECIESError}; -use reth_eth_wire::UnauthedEthStream; +use reth_eth_wire::{ + capability::{Capabilities, CapabilityMessage}, + UnauthedEthStream, +}; use secp256k1::{SecretKey, SECP256K1}; use std::{ collections::HashMap, diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 4edc36922..80d581eba 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -3,13 +3,14 @@ use crate::{ discovery::{Discovery, DiscoveryEvent}, fetch::StateFetcher, - message::{Capabilities, CapabilityResponse}, + message::CapabilityResponse, peers::{PeerAction, PeersManager}, session::PeerMessageSender, NodeId, }; use futures::FutureExt; +use reth_eth_wire::capability::Capabilities; use reth_interfaces::provider::BlockProvider; use reth_primitives::{H256, U256}; use std::{ diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index c5e631678..aa7ab075c 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -1,12 +1,12 @@ use crate::{ listener::{ConnectionListener, ListenerEvent}, - message::{Capabilities, CapabilityMessage}, session::{SessionEvent, SessionId, SessionManager}, state::{NetworkState, StateAction}, NodeId, }; use futures::Stream; use reth_ecies::ECIESError; +use reth_eth_wire::capability::{Capabilities, CapabilityMessage}; use reth_interfaces::provider::BlockProvider; use std::{ io,