diff --git a/crates/net/eth-wire/src/capability.rs b/crates/net/eth-wire/src/capability.rs index 1e1bb1b20..0dc9119ce 100644 --- a/crates/net/eth-wire/src/capability.rs +++ b/crates/net/eth-wire/src/capability.rs @@ -23,7 +23,7 @@ use std::{ pub struct RawCapabilityMessage { /// Identifier of the message. pub id: usize, - /// Actual payload + /// Actual __encoded__ payload pub payload: Bytes, } diff --git a/crates/net/eth-wire/src/ethstream.rs b/crates/net/eth-wire/src/ethstream.rs index 77266c1b7..ccc80594b 100644 --- a/crates/net/eth-wire/src/ethstream.rs +++ b/crates/net/eth-wire/src/ethstream.rs @@ -1,4 +1,5 @@ use crate::{ + capability::RawCapabilityMessage, errors::{EthHandshakeError, EthStreamError}, message::{EthBroadcastMessage, ProtocolBroadcastMessage}, p2pstream::HANDSHAKE_TIMEOUT, @@ -6,6 +7,7 @@ use crate::{ Status, }; use alloy_primitives::bytes::{Bytes, BytesMut}; +use alloy_rlp::Encodable; use futures::{ready, Sink, SinkExt, StreamExt}; use pin_project::pin_project; use reth_eth_wire_types::NetworkPrimitives; @@ -252,6 +254,16 @@ where Ok(()) } + + /// Sends a raw capability message directly over the stream + pub fn start_send_raw(&mut self, msg: RawCapabilityMessage) -> Result<(), EthStreamError> { + let mut bytes = Vec::new(); + msg.id.encode(&mut bytes); + bytes.extend_from_slice(&msg.payload); + + self.inner.start_send_unpin(bytes.into())?; + Ok(()) + } } impl Stream for EthStream @@ -361,13 +373,15 @@ mod tests { use crate::{ broadcast::BlockHashNumber, errors::{EthHandshakeError, EthStreamError}, + ethstream::RawCapabilityMessage, hello::DEFAULT_TCP_PORT, p2pstream::UnauthedP2PStream, EthMessage, EthStream, EthVersion, HelloMessageWithProtocols, PassthroughCodec, ProtocolVersion, Status, }; use alloy_chains::NamedChain; - use alloy_primitives::{B256, U256}; + use alloy_primitives::{bytes::Bytes, B256, U256}; + use alloy_rlp::Decodable; use futures::{SinkExt, StreamExt}; use reth_ecies::stream::ECIESStream; use reth_eth_wire_types::EthNetworkPrimitives; @@ -743,4 +757,39 @@ mod tests { matches!(handshake_result, Err(e) if e.to_string() == EthStreamError::StreamTimeout.to_string()) ); } + + #[tokio::test] + async fn can_write_and_read_raw_capability() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let local_addr = listener.local_addr().unwrap(); + + let test_msg = RawCapabilityMessage { id: 0x1234, payload: Bytes::from(vec![1, 2, 3, 4]) }; + + let test_msg_clone = test_msg.clone(); + let handle = tokio::spawn(async move { + let (incoming, _) = listener.accept().await.unwrap(); + let stream = PassthroughCodec::default().framed(incoming); + let mut stream = EthStream::<_, EthNetworkPrimitives>::new(EthVersion::Eth67, stream); + + let bytes = stream.inner_mut().next().await.unwrap().unwrap(); + + // Create a cursor to track position while decoding + let mut id_bytes = &bytes[..]; + let decoded_id = ::decode(&mut id_bytes).unwrap(); + assert_eq!(decoded_id, test_msg_clone.id); + + // Get remaining bytes after ID decoding + let remaining = id_bytes; + assert_eq!(remaining, &test_msg_clone.payload[..]); + }); + + let outgoing = TcpStream::connect(local_addr).await.unwrap(); + let sink = PassthroughCodec::default().framed(outgoing); + let mut client_stream = EthStream::<_, EthNetworkPrimitives>::new(EthVersion::Eth67, sink); + + client_stream.start_send_raw(test_msg).unwrap(); + client_stream.inner_mut().flush().await.unwrap(); + + handle.await.unwrap(); + } } diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index af9bb2f08..7b7837090 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -23,6 +23,7 @@ use alloy_primitives::Sealable; use futures::{stream::Fuse, SinkExt, StreamExt}; use metrics::Gauge; use reth_eth_wire::{ + capability::RawCapabilityMessage, errors::{EthHandshakeError, EthStreamError, P2PStreamError}, message::{EthBroadcastMessage, RequestPair}, Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives, @@ -278,6 +279,7 @@ impl ActiveSession { } PeerMessage::Other(other) => { debug!(target: "net::session", message_id=%other.id, "Ignoring unsupported message"); + self.queued_outgoing.push_back(OutgoingMessage::Raw(other)); } } } @@ -559,6 +561,7 @@ impl Future for ActiveSession { let res = match msg { OutgoingMessage::Eth(msg) => this.conn.start_send_unpin(msg), OutgoingMessage::Broadcast(msg) => this.conn.start_send_broadcast(msg), + OutgoingMessage::Raw(msg) => this.conn.start_send_raw(msg), }; if let Err(err) = res { debug!(target: "net::session", %err, remote_peer_id=?this.remote_peer_id, "failed to send message"); @@ -738,6 +741,8 @@ pub(crate) enum OutgoingMessage { Eth(EthMessage), /// A message that may be shared by multiple sessions. Broadcast(EthBroadcastMessage), + /// A raw capability message + Raw(RawCapabilityMessage), } impl From> for OutgoingMessage { diff --git a/crates/net/network/src/session/conn.rs b/crates/net/network/src/session/conn.rs index 6f87c26d6..c948937a0 100644 --- a/crates/net/network/src/session/conn.rs +++ b/crates/net/network/src/session/conn.rs @@ -3,6 +3,7 @@ use futures::{Sink, Stream}; use reth_ecies::stream::ECIESStream; use reth_eth_wire::{ + capability::RawCapabilityMessage, errors::EthStreamError, message::EthBroadcastMessage, multiplex::{ProtocolProxy, RlpxSatelliteStream}, @@ -84,6 +85,14 @@ impl EthRlpxConnection { Self::Satellite(conn) => conn.primary_mut().start_send_broadcast(item), } } + + /// Sends a raw capability message over the connection + pub fn start_send_raw(&mut self, msg: RawCapabilityMessage) -> Result<(), EthStreamError> { + match self { + Self::EthOnly(conn) => conn.start_send_raw(msg), + Self::Satellite(conn) => conn.primary_mut().start_send_raw(msg), + } + } } impl From> for EthRlpxConnection {