Subprotocol example (#8991)

Co-authored-by: owanikin <oderindeife@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Luca Provini
2024-06-26 17:54:26 +02:00
committed by GitHub
parent 1fde1dca1e
commit 063c08f561
11 changed files with 446 additions and 0 deletions

20
Cargo.lock generated
View File

@ -2810,6 +2810,26 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "example-custom-rlpx-subprotocol"
version = "0.0.0"
dependencies = [
"eyre",
"futures",
"rand 0.8.5",
"reth",
"reth-eth-wire",
"reth-network",
"reth-network-api",
"reth-node-ethereum",
"reth-primitives",
"reth-provider",
"reth-rpc-types",
"tokio",
"tokio-stream",
"tracing",
]
[[package]] [[package]]
name = "example-db-access" name = "example-db-access"
version = "0.0.0" version = "0.0.0"

View File

@ -127,6 +127,9 @@ members = [
"examples/polygon-p2p/", "examples/polygon-p2p/",
"examples/rpc-db/", "examples/rpc-db/",
"examples/txpool-tracing/", "examples/txpool-tracing/",
"examples/custom-rlpx-subprotocol",
"examples/exex/minimal/",
"examples/exex/op-bridge/",
"testing/ef-tests/", "testing/ef-tests/",
"testing/testing-utils", "testing/testing-utils",
] ]

View File

@ -0,0 +1,23 @@
[package]
name = "example-custom-rlpx-subprotocol"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true
[dependencies]
tokio = { workspace = true, features = ["full"] }
futures.workspace = true
reth-eth-wire.workspace = true
reth-network.workspace = true
reth-network-api.workspace = true
reth-node-ethereum.workspace = true
reth-provider.workspace = true
reth-primitives.workspace = true
reth-rpc-types.workspace = true
reth.workspace = true
tokio-stream.workspace = true
eyre.workspace = true
rand.workspace = true
tracing.workspace = true

View File

@ -0,0 +1,104 @@
//! Example for how to customize the network layer by adding a custom rlpx subprotocol.
//!
//! Run with
//!
//! ```not_rust
//! cargo run -p example-custom-rlpx-subprotocol -- node
//! ```
//!
//! This launch a regular reth node with a custom rlpx subprotocol.
use reth::builder::NodeHandle;
use reth_network::{
config::SecretKey, protocol::IntoRlpxSubProtocol, NetworkConfig, NetworkManager,
NetworkProtocols,
};
use reth_network_api::NetworkInfo;
use reth_node_ethereum::EthereumNode;
use reth_provider::test_utils::NoopProvider;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use subprotocol::{
connection::CustomCommand,
protocol::{
event::ProtocolEvent,
handler::{CustomRlpxProtoHandler, ProtocolState},
},
};
use tokio::sync::{mpsc, oneshot};
use tracing::info;
mod subprotocol;
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _args| async move {
// launch the node
let NodeHandle { node, node_exit_future } =
builder.node(EthereumNode::default()).launch().await?;
let peer_id = node.network.peer_id();
let peer_addr = node.network.local_addr();
// add the custom network subprotocol to the launched node
let (tx, mut from_peer0) = mpsc::unbounded_channel();
let custom_rlpx_handler = CustomRlpxProtoHandler { state: ProtocolState { events: tx } };
node.network.add_rlpx_sub_protocol(custom_rlpx_handler.into_rlpx_sub_protocol());
// creates a separate network instance and adds the custom network subprotocol
let secret_key = SecretKey::new(&mut rand::thread_rng());
let (tx, mut from_peer1) = mpsc::unbounded_channel();
let custom_rlpx_handler_2 = CustomRlpxProtoHandler { state: ProtocolState { events: tx } };
let net_cfg = NetworkConfig::builder(secret_key)
.listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
.disable_discovery()
.add_rlpx_sub_protocol(custom_rlpx_handler_2.into_rlpx_sub_protocol())
.build(NoopProvider::default());
// spawn the second network instance
let subnetwork = NetworkManager::new(net_cfg).await?;
let subnetwork_peer_id = *subnetwork.peer_id();
let subnetwork_peer_addr = subnetwork.local_addr();
let subnetwork_handle = subnetwork.peers_handle();
node.task_executor.spawn(subnetwork);
// connect the launched node to the subnetwork
node.network.peers_handle().add_peer(subnetwork_peer_id, subnetwork_peer_addr);
// connect the subnetwork to the launched node
subnetwork_handle.add_peer(*peer_id, peer_addr);
// establish connection between peer0 and peer1
let peer0_to_peer1 = from_peer0.recv().await.expect("peer0 connecting to peer1");
let peer0_conn = match peer0_to_peer1 {
ProtocolEvent::Established { direction: _, peer_id, to_connection } => {
assert_eq!(peer_id, subnetwork_peer_id);
to_connection
}
};
// establish connection between peer1 and peer0
let peer1_to_peer0 = from_peer1.recv().await.expect("peer1 connecting to peer0");
let peer1_conn = match peer1_to_peer0 {
ProtocolEvent::Established { direction: _, peer_id: peer1_id, to_connection } => {
assert_eq!(peer1_id, *peer_id);
to_connection
}
};
info!(target:"rlpx-subprotocol", "Connection established!");
// send a ping message from peer0 to peer1
let (tx, rx) = oneshot::channel();
peer0_conn.send(CustomCommand::Message { msg: "hello!".to_string(), response: tx })?;
let response = rx.await?;
assert_eq!(response, "hello!");
info!(target:"rlpx-subprotocol", ?response, "New message received");
// send a ping message from peer1 to peer0
let (tx, rx) = oneshot::channel();
peer1_conn.send(CustomCommand::Message { msg: "world!".to_string(), response: tx })?;
let response = rx.await?;
assert_eq!(response, "world!");
info!(target:"rlpx-subprotocol", ?response, "New message received");
info!(target:"rlpx-subprotocol", "Peers connected via custom rlpx subprotocol!");
node_exit_future.await
})
}

View File

@ -0,0 +1,53 @@
use super::CustomRlpxConnection;
use crate::subprotocol::protocol::{
event::ProtocolEvent, handler::ProtocolState, proto::CustomRlpxProtoMessage,
};
use reth_eth_wire::{
capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
};
use reth_network::protocol::{ConnectionHandler, OnNotSupported};
use reth_network_api::Direction;
use reth_rpc_types::PeerId;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
/// The connection handler for the custom RLPx protocol.
pub(crate) struct CustomRlpxConnectionHandler {
pub(crate) state: ProtocolState,
}
impl ConnectionHandler for CustomRlpxConnectionHandler {
type Connection = CustomRlpxConnection;
fn protocol(&self) -> Protocol {
CustomRlpxProtoMessage::protocol()
}
fn on_unsupported_by_peer(
self,
_supported: &SharedCapabilities,
_direction: Direction,
_peer_id: PeerId,
) -> OnNotSupported {
OnNotSupported::KeepAlive
}
fn into_connection(
self,
direction: Direction,
peer_id: PeerId,
conn: ProtocolConnection,
) -> Self::Connection {
let (tx, rx) = mpsc::unbounded_channel();
self.state
.events
.send(ProtocolEvent::Established { direction, peer_id, to_connection: tx })
.ok();
CustomRlpxConnection {
conn,
initial_ping: direction.is_outgoing().then(CustomRlpxProtoMessage::ping),
commands: UnboundedReceiverStream::new(rx),
pending_pong: None,
}
}
}

View File

@ -0,0 +1,76 @@
use super::protocol::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind};
use futures::{Stream, StreamExt};
use reth_eth_wire::multiplex::ProtocolConnection;
use reth_primitives::BytesMut;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tokio_stream::wrappers::UnboundedReceiverStream;
pub(crate) mod handler;
/// We define some custom commands that the subprotocol supports.
pub(crate) enum CustomCommand {
/// Sends a message to the peer
Message {
msg: String,
/// The response will be sent to this channel.
response: oneshot::Sender<String>,
},
}
/// The connection handler for the custom RLPx protocol.
pub(crate) struct CustomRlpxConnection {
conn: ProtocolConnection,
initial_ping: Option<CustomRlpxProtoMessage>,
commands: UnboundedReceiverStream<CustomCommand>,
pending_pong: Option<oneshot::Sender<String>>,
}
impl Stream for CustomRlpxConnection {
type Item = BytesMut;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(initial_ping) = this.initial_ping.take() {
return Poll::Ready(Some(initial_ping.encoded()))
}
loop {
if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) {
return match cmd {
CustomCommand::Message { msg, response } => {
this.pending_pong = Some(response);
Poll::Ready(Some(CustomRlpxProtoMessage::ping_message(msg).encoded()))
}
}
}
let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) };
let Some(msg) = CustomRlpxProtoMessage::decode_message(&mut &msg[..]) else {
return Poll::Ready(None)
};
match msg.message {
CustomRlpxProtoMessageKind::Ping => {
return Poll::Ready(Some(CustomRlpxProtoMessage::pong().encoded()))
}
CustomRlpxProtoMessageKind::Pong => {}
CustomRlpxProtoMessageKind::PingMessage(msg) => {
return Poll::Ready(Some(CustomRlpxProtoMessage::pong_message(msg).encoded()))
}
CustomRlpxProtoMessageKind::PongMessage(msg) => {
if let Some(sender) = this.pending_pong.take() {
sender.send(msg).ok();
}
continue
}
}
return Poll::Pending
}
}
}

View File

@ -0,0 +1,2 @@
pub(crate) mod connection;
pub(crate) mod protocol;

View File

@ -0,0 +1,15 @@
use crate::subprotocol::connection::CustomCommand;
use reth_network::Direction;
use reth_network_api::PeerId;
use tokio::sync::mpsc;
/// The events that can be emitted by our custom protocol.
#[derive(Debug)]
pub(crate) enum ProtocolEvent {
Established {
#[allow(dead_code)]
direction: Direction,
peer_id: PeerId,
to_connection: mpsc::UnboundedSender<CustomCommand>,
},
}

View File

@ -0,0 +1,34 @@
use super::event::ProtocolEvent;
use crate::subprotocol::connection::handler::CustomRlpxConnectionHandler;
use reth_network::protocol::ProtocolHandler;
use reth_network_api::PeerId;
use std::net::SocketAddr;
use tokio::sync::mpsc;
/// Protocol state is an helper struct to store the protocol events.
#[derive(Clone, Debug)]
pub(crate) struct ProtocolState {
pub(crate) events: mpsc::UnboundedSender<ProtocolEvent>,
}
/// The protocol handler takes care of incoming and outgoing connections.
#[derive(Debug)]
pub(crate) struct CustomRlpxProtoHandler {
pub state: ProtocolState,
}
impl ProtocolHandler for CustomRlpxProtoHandler {
type ConnectionHandler = CustomRlpxConnectionHandler;
fn on_incoming(&self, _socket_addr: SocketAddr) -> Option<Self::ConnectionHandler> {
Some(CustomRlpxConnectionHandler { state: self.state.clone() })
}
fn on_outgoing(
&self,
_socket_addr: SocketAddr,
_peer_id: PeerId,
) -> Option<Self::ConnectionHandler> {
Some(CustomRlpxConnectionHandler { state: self.state.clone() })
}
}

View File

@ -0,0 +1,3 @@
pub(crate) mod event;
pub(crate) mod handler;
pub(crate) mod proto;

View File

@ -0,0 +1,113 @@
//! Simple RLPx Ping Pong protocol that also support sending messages,
//! following [RLPx specs](https://github.com/ethereum/devp2p/blob/master/rlpx.md)
use reth_eth_wire::{protocol::Protocol, Capability};
use reth_primitives::{Buf, BufMut, BytesMut};
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum CustomRlpxProtoMessageId {
Ping = 0x00,
Pong = 0x01,
PingMessage = 0x02,
PongMessage = 0x03,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum CustomRlpxProtoMessageKind {
Ping,
Pong,
PingMessage(String),
PongMessage(String),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct CustomRlpxProtoMessage {
pub message_type: CustomRlpxProtoMessageId,
pub message: CustomRlpxProtoMessageKind,
}
impl CustomRlpxProtoMessage {
/// Returns the capability for the `custom_rlpx` protocol.
pub fn capability() -> Capability {
Capability::new_static("custom_rlpx", 1)
}
/// Returns the protocol for the `custom_rlpx` protocol.
pub fn protocol() -> Protocol {
Protocol::new(Self::capability(), 4)
}
/// Creates a ping message
pub fn ping_message(msg: impl Into<String>) -> Self {
Self {
message_type: CustomRlpxProtoMessageId::PingMessage,
message: CustomRlpxProtoMessageKind::PingMessage(msg.into()),
}
}
/// Creates a ping message
pub fn pong_message(msg: impl Into<String>) -> Self {
Self {
message_type: CustomRlpxProtoMessageId::PongMessage,
message: CustomRlpxProtoMessageKind::PongMessage(msg.into()),
}
}
/// Creates a ping message
pub fn ping() -> Self {
Self {
message_type: CustomRlpxProtoMessageId::Ping,
message: CustomRlpxProtoMessageKind::Ping,
}
}
/// Creates a pong message
pub fn pong() -> Self {
Self {
message_type: CustomRlpxProtoMessageId::Pong,
message: CustomRlpxProtoMessageKind::Pong,
}
}
/// Creates a new `CustomRlpxProtoMessage` with the given message ID and payload.
pub fn encoded(&self) -> BytesMut {
let mut buf = BytesMut::new();
buf.put_u8(self.message_type as u8);
match &self.message {
CustomRlpxProtoMessageKind::Ping | CustomRlpxProtoMessageKind::Pong => {}
CustomRlpxProtoMessageKind::PingMessage(msg) |
CustomRlpxProtoMessageKind::PongMessage(msg) => {
buf.put(msg.as_bytes());
}
}
buf
}
/// Decodes a `CustomRlpxProtoMessage` from the given message buffer.
pub fn decode_message(buf: &mut &[u8]) -> Option<Self> {
if buf.is_empty() {
return None;
}
let id = buf[0];
buf.advance(1);
let message_type = match id {
0x00 => CustomRlpxProtoMessageId::Ping,
0x01 => CustomRlpxProtoMessageId::Pong,
0x02 => CustomRlpxProtoMessageId::PingMessage,
0x03 => CustomRlpxProtoMessageId::PongMessage,
_ => return None,
};
let message = match message_type {
CustomRlpxProtoMessageId::Ping => CustomRlpxProtoMessageKind::Ping,
CustomRlpxProtoMessageId::Pong => CustomRlpxProtoMessageKind::Pong,
CustomRlpxProtoMessageId::PingMessage => CustomRlpxProtoMessageKind::PingMessage(
String::from_utf8_lossy(&buf[..]).into_owned(),
),
CustomRlpxProtoMessageId::PongMessage => CustomRlpxProtoMessageKind::PongMessage(
String::from_utf8_lossy(&buf[..]).into_owned(),
),
};
Some(Self { message_type, message })
}
}