feat: Support sending raw capability messages (#13028)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Steven
2024-12-05 09:11:15 -06:00
committed by GitHub
parent da03b7989e
commit 6f0cfefe51
4 changed files with 65 additions and 2 deletions

View File

@ -23,7 +23,7 @@ use std::{
pub struct RawCapabilityMessage {
/// Identifier of the message.
pub id: usize,
/// Actual payload
/// Actual __encoded__ payload
pub payload: Bytes,
}

View File

@ -1,4 +1,5 @@
use crate::{
capability::RawCapabilityMessage,
errors::{EthHandshakeError, EthStreamError},
message::{EthBroadcastMessage, ProtocolBroadcastMessage},
p2pstream::HANDSHAKE_TIMEOUT,
@ -6,6 +7,7 @@ use crate::{
Status,
};
use alloy_primitives::bytes::{Bytes, BytesMut};
use alloy_rlp::Encodable;
use futures::{ready, Sink, SinkExt, StreamExt};
use pin_project::pin_project;
use reth_eth_wire_types::NetworkPrimitives;
@ -252,6 +254,16 @@ where
Ok(())
}
/// Sends a raw capability message directly over the stream
pub fn start_send_raw(&mut self, msg: RawCapabilityMessage) -> Result<(), EthStreamError> {
let mut bytes = Vec::new();
msg.id.encode(&mut bytes);
bytes.extend_from_slice(&msg.payload);
self.inner.start_send_unpin(bytes.into())?;
Ok(())
}
}
impl<S, E, N> Stream for EthStream<S, N>
@ -361,13 +373,15 @@ mod tests {
use crate::{
broadcast::BlockHashNumber,
errors::{EthHandshakeError, EthStreamError},
ethstream::RawCapabilityMessage,
hello::DEFAULT_TCP_PORT,
p2pstream::UnauthedP2PStream,
EthMessage, EthStream, EthVersion, HelloMessageWithProtocols, PassthroughCodec,
ProtocolVersion, Status,
};
use alloy_chains::NamedChain;
use alloy_primitives::{B256, U256};
use alloy_primitives::{bytes::Bytes, B256, U256};
use alloy_rlp::Decodable;
use futures::{SinkExt, StreamExt};
use reth_ecies::stream::ECIESStream;
use reth_eth_wire_types::EthNetworkPrimitives;
@ -743,4 +757,39 @@ mod tests {
matches!(handshake_result, Err(e) if e.to_string() == EthStreamError::StreamTimeout.to_string())
);
}
#[tokio::test]
async fn can_write_and_read_raw_capability() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let test_msg = RawCapabilityMessage { id: 0x1234, payload: Bytes::from(vec![1, 2, 3, 4]) };
let test_msg_clone = test_msg.clone();
let handle = tokio::spawn(async move {
let (incoming, _) = listener.accept().await.unwrap();
let stream = PassthroughCodec::default().framed(incoming);
let mut stream = EthStream::<_, EthNetworkPrimitives>::new(EthVersion::Eth67, stream);
let bytes = stream.inner_mut().next().await.unwrap().unwrap();
// Create a cursor to track position while decoding
let mut id_bytes = &bytes[..];
let decoded_id = <usize as Decodable>::decode(&mut id_bytes).unwrap();
assert_eq!(decoded_id, test_msg_clone.id);
// Get remaining bytes after ID decoding
let remaining = id_bytes;
assert_eq!(remaining, &test_msg_clone.payload[..]);
});
let outgoing = TcpStream::connect(local_addr).await.unwrap();
let sink = PassthroughCodec::default().framed(outgoing);
let mut client_stream = EthStream::<_, EthNetworkPrimitives>::new(EthVersion::Eth67, sink);
client_stream.start_send_raw(test_msg).unwrap();
client_stream.inner_mut().flush().await.unwrap();
handle.await.unwrap();
}
}

View File

@ -23,6 +23,7 @@ use alloy_primitives::Sealable;
use futures::{stream::Fuse, SinkExt, StreamExt};
use metrics::Gauge;
use reth_eth_wire::{
capability::RawCapabilityMessage,
errors::{EthHandshakeError, EthStreamError, P2PStreamError},
message::{EthBroadcastMessage, RequestPair},
Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives,
@ -278,6 +279,7 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
}
PeerMessage::Other(other) => {
debug!(target: "net::session", message_id=%other.id, "Ignoring unsupported message");
self.queued_outgoing.push_back(OutgoingMessage::Raw(other));
}
}
}
@ -559,6 +561,7 @@ impl<N: NetworkPrimitives> Future for ActiveSession<N> {
let res = match msg {
OutgoingMessage::Eth(msg) => this.conn.start_send_unpin(msg),
OutgoingMessage::Broadcast(msg) => this.conn.start_send_broadcast(msg),
OutgoingMessage::Raw(msg) => this.conn.start_send_raw(msg),
};
if let Err(err) = res {
debug!(target: "net::session", %err, remote_peer_id=?this.remote_peer_id, "failed to send message");
@ -738,6 +741,8 @@ pub(crate) enum OutgoingMessage<N: NetworkPrimitives> {
Eth(EthMessage<N>),
/// A message that may be shared by multiple sessions.
Broadcast(EthBroadcastMessage<N>),
/// A raw capability message
Raw(RawCapabilityMessage),
}
impl<N: NetworkPrimitives> From<EthMessage<N>> for OutgoingMessage<N> {

View File

@ -3,6 +3,7 @@
use futures::{Sink, Stream};
use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{
capability::RawCapabilityMessage,
errors::EthStreamError,
message::EthBroadcastMessage,
multiplex::{ProtocolProxy, RlpxSatelliteStream},
@ -84,6 +85,14 @@ impl<N: NetworkPrimitives> EthRlpxConnection<N> {
Self::Satellite(conn) => conn.primary_mut().start_send_broadcast(item),
}
}
/// Sends a raw capability message over the connection
pub fn start_send_raw(&mut self, msg: RawCapabilityMessage) -> Result<(), EthStreamError> {
match self {
Self::EthOnly(conn) => conn.start_send_raw(msg),
Self::Satellite(conn) => conn.primary_mut().start_send_raw(msg),
}
}
}
impl<N: NetworkPrimitives> From<EthPeerConnection<N>> for EthRlpxConnection<N> {