mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: support multiple shared caps (#5363)
Co-authored-by: Emilia Hane <emiliaha95@gmail.com>
This commit is contained in:
@ -1,6 +1,8 @@
|
||||
//! All capability related types
|
||||
|
||||
use crate::{version::ParseVersionError, EthMessage, EthVersion};
|
||||
use crate::{
|
||||
p2pstream::MAX_RESERVED_MESSAGE_ID, version::ParseVersionError, EthMessage, EthVersion,
|
||||
};
|
||||
use alloy_rlp::{Decodable, Encodable, RlpDecodable, RlpEncodable};
|
||||
use reth_codecs::add_arbitrary_tests;
|
||||
use reth_primitives::bytes::{BufMut, Bytes};
|
||||
@ -227,6 +229,11 @@ impl SharedCapability {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the capability is eth.
|
||||
pub fn is_eth(&self) -> bool {
|
||||
matches!(self, SharedCapability::Eth { .. })
|
||||
}
|
||||
|
||||
/// Returns the version of the capability.
|
||||
pub fn version(&self) -> u8 {
|
||||
match self {
|
||||
@ -243,6 +250,12 @@ impl SharedCapability {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the message ID offset of the current capability relative to the start of the
|
||||
/// capability message ID suffix.
|
||||
pub fn offset_rel_caps_suffix(&self) -> u8 {
|
||||
self.offset() - MAX_RESERVED_MESSAGE_ID - 1
|
||||
}
|
||||
|
||||
/// Returns the number of protocol messages supported by this capability.
|
||||
pub fn num_messages(&self) -> Result<u8, SharedCapabilityError> {
|
||||
match self {
|
||||
|
||||
@ -30,7 +30,9 @@ pub enum P2PStreamError {
|
||||
#[error("ping timed out with")]
|
||||
PingTimeout,
|
||||
#[error(transparent)]
|
||||
ParseVersionError(#[from] SharedCapabilityError),
|
||||
ParseSharedCapability(#[from] SharedCapabilityError),
|
||||
#[error("capability not supported on stream to this peer")]
|
||||
CapabilityNotShared,
|
||||
#[error("mismatched protocol version in Hello message: {0}")]
|
||||
MismatchedProtocolVersion(GotExpected<ProtocolVersion>),
|
||||
#[error("started ping task before the handshake completed")]
|
||||
|
||||
@ -34,7 +34,7 @@ const MAX_PAYLOAD_SIZE: usize = 16 * 1024 * 1024;
|
||||
|
||||
/// [`MAX_RESERVED_MESSAGE_ID`] is the maximum message ID reserved for the `p2p` subprotocol. If
|
||||
/// there are any incoming messages with an ID greater than this, they are subprotocol messages.
|
||||
const MAX_RESERVED_MESSAGE_ID: u8 = 0x0f;
|
||||
pub const MAX_RESERVED_MESSAGE_ID: u8 = 0x0f;
|
||||
|
||||
/// [`MAX_P2P_MESSAGE_ID`] is the maximum message ID in use for the `p2p` subprotocol.
|
||||
const MAX_P2P_MESSAGE_ID: u8 = P2PMessageID::Pong as u8;
|
||||
@ -159,7 +159,7 @@ where
|
||||
|
||||
// determine shared capabilities (currently returns only one capability)
|
||||
let capability_res =
|
||||
set_capability_offsets(hello.capabilities, their_hello.capabilities.clone());
|
||||
SharedCapabilities::try_new(hello.capabilities, their_hello.capabilities.clone());
|
||||
|
||||
let shared_capability = match capability_res {
|
||||
Err(err) => {
|
||||
@ -207,6 +207,27 @@ where
|
||||
|
||||
/// A P2PStream wraps over any `Stream` that yields bytes and makes it compatible with `p2p`
|
||||
/// protocol messages.
|
||||
///
|
||||
/// This stream supports multiple shared capabilities, that were negotiated during the handshake.
|
||||
///
|
||||
/// ### Message-ID based multiplexing
|
||||
///
|
||||
/// > Each capability is given as much of the message-ID space as it needs. All such capabilities
|
||||
/// > must statically specify how many message IDs they require. On connection and reception of the
|
||||
/// > Hello message, both peers have equivalent information about what capabilities they share
|
||||
/// > (including versions) and are able to form consensus over the composition of message ID space.
|
||||
///
|
||||
/// > Message IDs are assumed to be compact from ID 0x10 onwards (0x00-0x0f is reserved for the
|
||||
/// > "p2p" capability) and given to each shared (equal-version, equal-name) capability in
|
||||
/// > alphabetic order. Capability names are case-sensitive. Capabilities which are not shared are
|
||||
/// > ignored. If multiple versions are shared of the same (equal name) capability, the numerically
|
||||
/// > highest wins, others are ignored.
|
||||
///
|
||||
/// See also <https://github.com/ethereum/devp2p/blob/master/rlpx.md#message-id-based-multiplexing>
|
||||
///
|
||||
/// This stream emits Bytes that start with the normalized message id, so that the first byte of
|
||||
/// each message starts from 0. If this stream only supports a single capability, for example `eth`
|
||||
/// then the first byte of each message will match [EthMessageID](crate::types::EthMessageID).
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct P2PStream<S> {
|
||||
@ -223,7 +244,7 @@ pub struct P2PStream<S> {
|
||||
pinger: Pinger,
|
||||
|
||||
/// The supported capability for this stream.
|
||||
shared_capability: SharedCapability,
|
||||
shared_capabilities: SharedCapabilities,
|
||||
|
||||
/// Outgoing messages buffered for sending to the underlying stream.
|
||||
outgoing_messages: VecDeque<Bytes>,
|
||||
@ -241,13 +262,13 @@ impl<S> P2PStream<S> {
|
||||
/// Create a new [`P2PStream`] from the provided stream.
|
||||
/// New [`P2PStream`]s are assumed to have completed the `p2p` handshake successfully and are
|
||||
/// ready to send and receive subprotocol messages.
|
||||
pub fn new(inner: S, capability: SharedCapability) -> Self {
|
||||
pub fn new(inner: S, shared_capabilities: SharedCapabilities) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
encoder: snap::raw::Encoder::new(),
|
||||
decoder: snap::raw::Decoder::new(),
|
||||
pinger: Pinger::new(PING_INTERVAL, PING_TIMEOUT),
|
||||
shared_capability: capability,
|
||||
shared_capabilities,
|
||||
outgoing_messages: VecDeque::new(),
|
||||
outgoing_message_buffer_capacity: MAX_P2P_CAPACITY,
|
||||
disconnecting: false,
|
||||
@ -268,9 +289,12 @@ impl<S> P2PStream<S> {
|
||||
self.outgoing_message_buffer_capacity = capacity;
|
||||
}
|
||||
|
||||
/// Returns the shared capability for this stream.
|
||||
pub fn shared_capability(&self) -> &SharedCapability {
|
||||
&self.shared_capability
|
||||
/// Returns the shared capabilities for this stream.
|
||||
///
|
||||
/// This includes all the shared capabilities that were negotiated during the handshake and
|
||||
/// their offsets based on the number of messages of each capability.
|
||||
pub fn shared_capabilities(&self) -> &SharedCapabilities {
|
||||
&self.shared_capabilities
|
||||
}
|
||||
|
||||
/// Returns `true` if the connection is about to disconnect.
|
||||
@ -460,7 +484,7 @@ where
|
||||
// * `eth/67` is reserved message IDs 0x10 - 0x19.
|
||||
// * `qrs/65` is reserved message IDs 0x1a - 0x21.
|
||||
//
|
||||
decompress_buf[0] = bytes[0] - this.shared_capability.offset();
|
||||
decompress_buf[0] = bytes[0] - MAX_RESERVED_MESSAGE_ID - 1;
|
||||
|
||||
return Poll::Ready(Some(Ok(decompress_buf)))
|
||||
}
|
||||
@ -539,7 +563,7 @@ where
|
||||
|
||||
// all messages sent in this stream are subprotocol messages, so we need to switch the
|
||||
// message id based on the offset
|
||||
compressed[0] = item[0] + this.shared_capability.offset();
|
||||
compressed[0] = item[0] + MAX_RESERVED_MESSAGE_ID + 1;
|
||||
this.outgoing_messages.push_back(compressed.freeze());
|
||||
|
||||
Ok(())
|
||||
@ -571,16 +595,50 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Non-empty ordered list of recognized shared capabilities.
|
||||
#[derive(Debug)]
|
||||
pub struct SharedCapabilities(Vec<SharedCapability>);
|
||||
|
||||
impl SharedCapabilities {
|
||||
/// Merges the local and peer capabilities and returns a new [`SharedCapabilities`] instance.
|
||||
pub fn try_new(
|
||||
local_capabilities: Vec<Capability>,
|
||||
peer_capabilities: Vec<Capability>,
|
||||
) -> Result<Self, P2PStreamError> {
|
||||
Ok(Self(set_capability_offsets(local_capabilities, peer_capabilities)?))
|
||||
}
|
||||
|
||||
/// Iterates over the shared capabilities.
|
||||
pub fn iter_caps(&self) -> impl Iterator<Item = &SharedCapability> {
|
||||
self.0.iter()
|
||||
}
|
||||
|
||||
/// Returns the eth capability if it is shared.
|
||||
pub fn eth(&self) -> Result<&SharedCapability, P2PStreamError> {
|
||||
for cap in self.iter_caps() {
|
||||
if cap.is_eth() {
|
||||
return Ok(cap)
|
||||
}
|
||||
}
|
||||
Err(P2PStreamError::CapabilityNotShared)
|
||||
}
|
||||
|
||||
/// Returns the negotiated eth version if it is shared.
|
||||
pub fn eth_version(&self) -> Result<u8, P2PStreamError> {
|
||||
self.eth().map(|cap| cap.version())
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines the offsets for each shared capability between the input list of peer
|
||||
/// capabilities and the input list of locally supported capabilities.
|
||||
///
|
||||
/// Currently only `eth` versions 66 and 67 are supported.
|
||||
/// Currently only `eth` versions 66, 67, 68 are supported.
|
||||
/// Additionally, the `p2p` capability version 5 is supported, but is
|
||||
/// expected _not_ to be in neither `local_capabilities` or `peer_capabilities`.
|
||||
pub fn set_capability_offsets(
|
||||
local_capabilities: Vec<Capability>,
|
||||
peer_capabilities: Vec<Capability>,
|
||||
) -> Result<SharedCapability, P2PStreamError> {
|
||||
) -> Result<Vec<SharedCapability>, P2PStreamError> {
|
||||
// find intersection of capabilities
|
||||
let our_capabilities = local_capabilities.into_iter().collect::<HashSet<_>>();
|
||||
|
||||
@ -597,8 +655,7 @@ pub fn set_capability_offsets(
|
||||
// This would cause the peers to send messages with the wrong message id, which is usually a
|
||||
// protocol violation.
|
||||
//
|
||||
// The `Ord` implementation for `SmolStr` (used here) currently delegates to rust's `Ord`
|
||||
// implementation for `str`, which also orders strings lexicographically.
|
||||
// The `Ord` implementation for `str` orders strings lexicographically.
|
||||
let mut shared_capability_names = BTreeSet::new();
|
||||
|
||||
// find highest shared version of each shared capability
|
||||
@ -640,6 +697,8 @@ pub fn set_capability_offsets(
|
||||
SharedCapability::UnknownCapability { .. } => {
|
||||
// Capabilities which are not shared are ignored
|
||||
debug!("unknown capability: name={:?}, version={}", name, version,);
|
||||
|
||||
// TODO(mattsse): track shared caps
|
||||
}
|
||||
SharedCapability::Eth { .. } => {
|
||||
// increment the offset if the capability is known
|
||||
@ -650,17 +709,11 @@ pub fn set_capability_offsets(
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: support multiple capabilities - we would need a new Stream type to go on top of
|
||||
// `P2PStream` containing its capability. `P2PStream` would still send pings and handle
|
||||
// pongs, but instead contain a map of capabilities to their respective stream / channel.
|
||||
// Each channel would be responsible for containing the offset for that stream and would
|
||||
// only increment / decrement message IDs.
|
||||
// NOTE: since the `P2PStream` currently only supports one capability, we set the
|
||||
// capability with the lowest offset.
|
||||
Ok(shared_with_offsets
|
||||
.first()
|
||||
.ok_or(P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))?
|
||||
.clone())
|
||||
if shared_with_offsets.is_empty() {
|
||||
return Err(P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))
|
||||
}
|
||||
|
||||
Ok(shared_with_offsets)
|
||||
}
|
||||
|
||||
/// This represents only the reserved `p2p` subprotocol messages.
|
||||
@ -928,7 +981,7 @@ mod tests {
|
||||
|
||||
// ensure that the two share a single capability, eth67
|
||||
assert_eq!(
|
||||
p2p_stream.shared_capability,
|
||||
*p2p_stream.shared_capabilities.iter_caps().next().unwrap(),
|
||||
SharedCapability::Eth {
|
||||
version: EthVersion::Eth67,
|
||||
offset: MAX_RESERVED_MESSAGE_ID + 1
|
||||
@ -946,7 +999,7 @@ mod tests {
|
||||
|
||||
// ensure that the two share a single capability, eth67
|
||||
assert_eq!(
|
||||
p2p_stream.shared_capability,
|
||||
*p2p_stream.shared_capabilities.iter_caps().next().unwrap(),
|
||||
SharedCapability::Eth {
|
||||
version: EthVersion::Eth67,
|
||||
offset: MAX_RESERVED_MESSAGE_ID + 1
|
||||
@ -1019,7 +1072,7 @@ mod tests {
|
||||
let peer_capabilities: Vec<Capability> = vec![EthVersion::Eth66.into()];
|
||||
|
||||
let shared_capability =
|
||||
set_capability_offsets(local_capabilities, peer_capabilities).unwrap();
|
||||
set_capability_offsets(local_capabilities, peer_capabilities).unwrap()[0].clone();
|
||||
|
||||
assert_eq!(
|
||||
shared_capability,
|
||||
|
||||
@ -12,6 +12,10 @@ use std::{fmt::Debug, sync::Arc};
|
||||
#[cfg(feature = "serde")]
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message.
|
||||
// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50
|
||||
pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
|
||||
|
||||
/// An `eth` protocol message, containing a message ID and payload.
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
|
||||
@ -2,10 +2,9 @@
|
||||
|
||||
use crate::capability::Capability;
|
||||
use std::str::FromStr;
|
||||
use thiserror::Error;
|
||||
|
||||
/// Error thrown when failed to parse a valid [`EthVersion`].
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Error)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
|
||||
#[error("Unknown eth protocol version: {0}")]
|
||||
pub struct ParseVersionError(String);
|
||||
|
||||
|
||||
@ -154,7 +154,8 @@ impl SessionError for EthStreamError {
|
||||
)) |
|
||||
P2PStreamError::UnknownReservedMessageId(_) |
|
||||
P2PStreamError::EmptyProtocolMessage |
|
||||
P2PStreamError::ParseVersionError(_) |
|
||||
P2PStreamError::ParseSharedCapability(_) |
|
||||
P2PStreamError::CapabilityNotShared |
|
||||
P2PStreamError::Disconnected(DisconnectReason::UselessPeer) |
|
||||
P2PStreamError::Disconnected(
|
||||
DisconnectReason::IncompatibleP2PProtocolVersion
|
||||
|
||||
@ -46,6 +46,7 @@ pub use handle::{
|
||||
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
|
||||
SessionCommand,
|
||||
};
|
||||
|
||||
pub use reth_network_api::{Direction, PeerInfo};
|
||||
|
||||
/// Internal identifier for active sessions.
|
||||
@ -912,10 +913,23 @@ async fn authenticate_stream(
|
||||
}
|
||||
};
|
||||
|
||||
// Ensure we negotiated eth protocol
|
||||
let version = match p2p_stream.shared_capabilities().eth_version() {
|
||||
Ok(version) => version,
|
||||
Err(err) => {
|
||||
return PendingSessionEvent::Disconnected {
|
||||
remote_addr,
|
||||
session_id,
|
||||
direction,
|
||||
error: Some(err.into()),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// if the hello handshake was successful we can try status handshake
|
||||
//
|
||||
// Before trying status handshake, set up the version to shared_capability
|
||||
let status = Status { version: p2p_stream.shared_capability().version(), ..status };
|
||||
let status = Status { version, ..status };
|
||||
let eth_unauthed = UnauthedEthStream::new(p2p_stream);
|
||||
let (eth_stream, their_status) = match eth_unauthed.handshake(status, fork_filter).await {
|
||||
Ok(stream_res) => stream_res,
|
||||
|
||||
@ -105,7 +105,7 @@ async fn handshake_eth(p2p_stream: AuthedP2PStream) -> eyre::Result<(AuthedEthSt
|
||||
.forkid(Hardfork::Shanghai.fork_id(&MAINNET).unwrap())
|
||||
.build();
|
||||
|
||||
let status = Status { version: p2p_stream.shared_capability().version(), ..status };
|
||||
let status = Status { version: p2p_stream.shared_capabilities().eth()?.version(), ..status };
|
||||
let eth_unauthed = UnauthedEthStream::new(p2p_stream);
|
||||
Ok(eth_unauthed.handshake(status, fork_filter).await?)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user