Implement ETH P2P (#81)

* refactor: move things to types

* feat(ethwire): bring in message type from ethp2p

30c11138d5/src/message.rs

* feat(ethwire): add eth-stream with Stream/Sink impls

* feat(ethwire): make Sink error an EthStreamError

* feat(ecies): expose util module

* add more deps

* feat: add broadcast newblockhashes

* fix: rlp serialize with message-id first

* chore: cleanup doc

* wip: test eth connection

* bump cargo lock

* feat: add rlp codec and get stream tests working

* fix: convert RlpCodec to PassthroughCodec

we were rlp encoding twice

* Revert "fix: convert RlpCodec to PassthroughCodec"

This reverts commit 6e6e0a58112c14d7ffba62dc83f9747ddc280641.

This does not handle framing, which would fail decoding if a partial
write happened to the socket.

* add tracing

* refactor: add handshake error

* feat(ethwire): add status handshake

* test(ethwire): handshake works

* refactor: expose EthStream::new

* chore: clippy lints

* fix: get rid of rlp codec

we can instead use LengthLimitedCodec from Tokio IO, which we re-export
as PassthroughCodec

* fix(eth): add handshake + msg error checks

1. 10MB message lengths
2. Same Genesis, Version, Chain on Status Handshake

* chore: ignore result_large_err

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Georgios Konstantopoulos
2022-10-16 20:10:25 -07:00
committed by GitHub
parent 5dfe5ac29b
commit 8009d997c0
18 changed files with 805 additions and 27 deletions

9
Cargo.lock generated
View File

@ -2199,12 +2199,21 @@ dependencies = [
"bytes",
"crc",
"ethers-core",
"futures",
"hex",
"hex-literal",
"maplit",
"pin-project",
"rand",
"reth-ecies",
"reth-primitives",
"reth-rlp",
"secp256k1",
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
"tracing",
]
[[package]]

View File

@ -10,6 +10,7 @@ fn zeroless_view(v: &impl AsRef<[u8]>) -> &[u8] {
}
impl Header {
/// Encodes the header into the `out` buffer.
pub fn encode(&self, out: &mut dyn BufMut) {
if self.payload_length < 56 {
let code = if self.list { EMPTY_LIST_CODE } else { EMPTY_STRING_CODE };
@ -22,6 +23,13 @@ impl Header {
out.put_slice(len_be);
}
}
/// Returns the length of the encoded header
pub fn length(&self) -> usize {
let mut out = BytesMut::new();
self.encode(&mut out);
out.len()
}
}
pub const fn length_of_length(payload_length: usize) -> usize {

View File

@ -1,4 +1,4 @@
#[derive(Clone, Default, PartialEq)]
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Header {
pub list: bool,
pub payload_length: usize,

View File

@ -2,7 +2,7 @@ use crate::{
algorithm::{ECIES, MAX_BODY_SIZE},
ECIESError, EgressECIESValue, IngressECIESValue,
};
use bytes::{Bytes, BytesMut};
use bytes::BytesMut;
use reth_primitives::H512 as PeerId;
use secp256k1::SecretKey;
use std::{fmt::Debug, io};
@ -105,7 +105,8 @@ impl Decoder for ECIESCodec {
}
let mut data = buf.split_to(self.ecies.body_len());
let ret = Bytes::copy_from_slice(self.ecies.read_body(&mut data)?);
let mut ret = BytesMut::new();
ret.extend_from_slice(self.ecies.read_body(&mut data)?);
self.state = ECIESState::Header;
return Ok(Some(IngressECIESValue::Message(ret)))

View File

@ -1,6 +1,5 @@
use thiserror::Error;
use crate::IngressECIESValue;
use thiserror::Error;
/// An error that occurs while reading or writing to an ECIES stream.
#[derive(Debug, Error)]

View File

@ -1,3 +1,4 @@
#![allow(clippy::result_large_err)]
#![warn(missing_docs, unreachable_pub)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
@ -10,7 +11,7 @@
pub mod algorithm;
pub mod mac;
pub mod stream;
mod util;
pub mod util;
mod error;
pub use error::ECIESError;
@ -38,5 +39,5 @@ pub enum IngressECIESValue {
/// Receiving an ACK message
Ack,
/// Receiving a message
Message(bytes::Bytes),
Message(bytes::BytesMut),
}

View File

@ -1,5 +1,5 @@
//! The ECIES Stream implementation which wraps over [`AsyncRead`] and [`AsyncWrite`].
use crate::{ECIESError, EgressECIESValue, IngressECIESValue};
use crate::{codec::ECIESCodec, ECIESError, EgressECIESValue, IngressECIESValue};
use bytes::Bytes;
use futures::{ready, Sink, SinkExt};
use reth_primitives::H512 as PeerId;
@ -19,8 +19,6 @@ use tokio_stream::{Stream, StreamExt};
use tokio_util::codec::{Decoder, Framed};
use tracing::{debug, instrument, trace};
use crate::codec::ECIESCodec;
/// `ECIES` stream over TCP exchanging raw bytes
#[derive(Debug)]
pub struct ECIESStream<Io> {
@ -106,7 +104,7 @@ impl<Io> Stream for ECIESStream<Io>
where
Io: AsyncRead + Unpin,
{
type Item = Result<Bytes, io::Error>;
type Item = Result<bytes::BytesMut, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(Pin::new(&mut self.get_mut().stream).poll_next(cx)) {
@ -156,8 +154,6 @@ mod tests {
use super::*;
#[tokio::test]
// TODO: implement test for the proposed
// API: https://github.com/foundry-rs/reth/issues/64#issue-1408708420
async fn can_write_and_read() {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
let server_key = SecretKey::new(&mut rand::thread_rng());

View File

@ -1,3 +1,5 @@
//! Utility functions for hashing and encoding.
use hmac::{Hmac, Mac};
use reth_primitives::{H256, H512 as PeerId};
use secp256k1::PublicKey;
@ -22,7 +24,7 @@ pub(crate) fn hmac_sha256(key: &[u8], input: &[&[u8]], auth_data: &[u8]) -> H256
/// Converts a [secp256k1::PublicKey] to a [PeerId] by stripping the
/// SECP256K1_TAG_PUBKEY_UNCOMPRESSED tag and storing the rest of the slice in the [PeerId].
pub(crate) fn pk2id(pk: &PublicKey) -> PeerId {
pub fn pk2id(pk: &PublicKey) -> PeerId {
PeerId::from_slice(&pk.serialize_uncompressed()[1..])
}

View File

@ -1,5 +1,6 @@
[package]
name = "reth-eth-wire"
description = "Impements the eth/64 and eth/65 P2P protocols"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
@ -12,13 +13,22 @@ hex = "0.4"
thiserror = "1"
# reth
reth-ecies = { path = "../ecies" }
reth-primitives = { path = "../../primitives" }
reth-rlp = { path = "../../common/rlp", features = ["alloc", "derive", "std", "ethereum-types"] }
#used for forkid
crc = "1"
maplit = "1"
tokio = { version = "1.21.2", features = ["full"] }
futures = "0.3.24"
tokio-stream = "0.1.11"
secp256k1 = { version = "0.24.0", features = ["global-context", "rand-std", "recovery"] }
tokio-util = { version = "0.7.4", features = ["io"] }
pin-project = "1.0"
tracing = "0.1.37"
[dev-dependencies]
hex-literal = "0.3"
ethers-core = { git = "https://github.com/gakonst/ethers-rs", default-features = false }
ethers-core = { git = "https://github.com/gakonst/ethers-rs", default-features = false }
rand = "0.8"

View File

@ -0,0 +1,39 @@
//! Error cases when handling a [`crate::EthStream`]
use std::io;
use reth_primitives::{Chain, H256};
use crate::types::forkid::ValidationError;
/// Errors when sending/receiving messages
#[derive(thiserror::Error, Debug)]
#[allow(missing_docs)]
pub enum EthStreamError {
#[error(transparent)]
Io(#[from] io::Error),
#[error(transparent)]
Rlp(#[from] reth_rlp::DecodeError),
#[error(transparent)]
HandshakeError(#[from] HandshakeError),
#[error("message size ({0}) exceeds max length (10MB)")]
MessageTooBig(usize),
}
#[derive(thiserror::Error, Debug)]
#[allow(missing_docs)]
pub enum HandshakeError {
#[error("status message can only be recv/sent in handshake")]
StatusNotInHandshake,
#[error("received non-status message when trying to handshake")]
NonStatusMessageInHandshake,
#[error("no response received when sending out handshake")]
NoResponse,
#[error(transparent)]
InvalidFork(#[from] ValidationError),
#[error("mismatched genesis in Status message. expected: {expected:?}, got: {got:?}")]
MismatchedGenesis { expected: H256, got: H256 },
#[error("mismatched protocol version in Status message. expected: {expected:?}, got: {got:?}")]
MismatchedProtocolVersion { expected: u8, got: u8 },
#[error("mismatched chain in Status message. expected: {expected:?}, got: {got:?}")]
MismatchedChain { expected: Chain, got: Chain },
}

View File

@ -4,13 +4,13 @@
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Implementation of the `eth` wire protocol.
//! Types for the eth wire protocol.
pub use tokio_util::codec::{
LengthDelimitedCodec as PassthroughCodec, LengthDelimitedCodecError as PassthroughCodecError,
};
pub mod error;
mod stream;
pub mod types;
mod status;
pub use status::Status;
mod version;
pub use version::EthVersion;
pub mod forkid;
pub use stream::EthStream;

View File

@ -0,0 +1,301 @@
use crate::{
error::{EthStreamError, HandshakeError},
types::{forkid::ForkFilter, EthMessage, ProtocolMessage, Status},
};
use bytes::BytesMut;
use futures::{ready, Sink, SinkExt, StreamExt};
use pin_project::pin_project;
use reth_rlp::{Decodable, Encodable};
use std::{
io,
pin::Pin,
task::{Context, Poll},
};
use tokio_stream::Stream;
/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message.
// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50
const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
/// An `EthStream` wraps over any `Stream` that yields bytes and makes it
/// compatible with eth-networking protocol messages, which get RLP encoded/decoded.
#[pin_project]
pub struct EthStream<S> {
#[pin]
inner: S,
/// Whether the `Status` handshake has been completed
authed: bool,
}
impl<S> EthStream<S> {
/// Creates a new unauthed [`EthStream`] from a provided stream. You will need
/// to manually handshake a peer.
pub fn new(inner: S) -> Self {
Self { inner, authed: false }
}
}
impl<S> EthStream<S>
where
S: Stream<Item = Result<bytes::BytesMut, io::Error>>
+ Sink<bytes::Bytes, Error = io::Error>
+ Unpin,
{
/// Given an instantiated transport layer, it proceeds to return an [`EthStream`]
/// after performing a [`Status`] message handshake as specified in
pub async fn connect(
inner: S,
status: Status,
fork_filter: ForkFilter,
) -> Result<Self, EthStreamError> {
let mut this = Self::new(inner);
this.handshake(status, fork_filter).await?;
Ok(this)
}
/// Performs a handshake with the connected peer over the transport stream.
pub async fn handshake(
&mut self,
status: Status,
fork_filter: ForkFilter,
) -> Result<(), EthStreamError> {
tracing::trace!("sending eth status ...");
self.send(EthMessage::Status(status)).await?;
tracing::trace!("waiting for eth status from peer ...");
let msg = self
.next()
.await
.ok_or_else(|| EthStreamError::HandshakeError(HandshakeError::NoResponse))??;
// TODO: Add any missing checks
// https://github.com/ethereum/go-ethereum/blob/9244d5cd61f3ea5a7645fdf2a1a96d53421e412f/eth/protocols/eth/handshake.go#L87-L89
match msg {
EthMessage::Status(resp) => {
self.authed = true;
if status.genesis != resp.genesis {
return Err(HandshakeError::MismatchedGenesis {
expected: status.genesis,
got: resp.genesis,
}
.into())
}
if status.version != resp.version {
return Err(HandshakeError::MismatchedProtocolVersion {
expected: status.version,
got: resp.version,
}
.into())
}
if status.chain != resp.chain {
return Err(HandshakeError::MismatchedChain {
expected: status.chain,
got: resp.chain,
}
.into())
}
Ok(fork_filter.validate(resp.forkid).map_err(HandshakeError::InvalidFork)?)
}
_ => Err(EthStreamError::HandshakeError(HandshakeError::NonStatusMessageInHandshake)),
}
}
}
impl<S> Stream for EthStream<S>
where
S: Stream<Item = Result<bytes::BytesMut, io::Error>> + Unpin,
{
type Item = Result<EthMessage, EthStreamError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let res = ready!(this.inner.poll_next(cx));
let bytes = match res {
Some(Ok(bytes)) => bytes,
Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
None => return Poll::Ready(None),
};
if bytes.len() > MAX_MESSAGE_SIZE {
return Poll::Ready(Some(Err(EthStreamError::MessageTooBig(bytes.len()))))
}
let msg = match ProtocolMessage::decode(&mut bytes.as_ref()) {
Ok(m) => m,
Err(err) => return Poll::Ready(Some(Err(err.into()))),
};
if *this.authed && matches!(msg.message, EthMessage::Status(_)) {
return Poll::Ready(Some(Err(EthStreamError::HandshakeError(
HandshakeError::StatusNotInHandshake,
))))
}
Poll::Ready(Some(Ok(msg.message)))
}
}
impl<S> Sink<EthMessage> for EthStream<S>
where
S: Sink<bytes::Bytes, Error = io::Error> + Unpin,
{
type Error = EthStreamError;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_ready(cx).map_err(Into::into)
}
fn start_send(self: Pin<&mut Self>, item: EthMessage) -> Result<(), Self::Error> {
if self.authed && matches!(item, EthMessage::Status(_)) {
return Err(EthStreamError::HandshakeError(HandshakeError::StatusNotInHandshake))
}
let mut bytes = BytesMut::new();
ProtocolMessage::from(item).encode(&mut bytes);
let bytes = bytes.freeze();
self.project().inner.start_send(bytes)?;
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(cx).map_err(Into::into)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(Into::into)
}
}
#[cfg(test)]
mod tests {
use crate::{
types::{broadcast::BlockHashNumber, forkid::ForkFilter, EthMessage, Status},
EthStream, PassthroughCodec,
};
use futures::{SinkExt, StreamExt};
use reth_ecies::{stream::ECIESStream, util::pk2id};
use secp256k1::{SecretKey, SECP256K1};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::codec::Decoder;
use crate::types::EthVersion;
use ethers_core::types::Chain;
use reth_primitives::{H256, U256};
#[tokio::test]
async fn can_handshake() {
let genesis = H256::random();
let fork_filter = ForkFilter::new(0, genesis, vec![]);
let status = Status {
version: EthVersion::Eth67 as u8,
chain: Chain::Mainnet.into(),
total_difficulty: U256::from(0),
blockhash: H256::random(),
genesis,
// Pass the current fork id.
forkid: fork_filter.current(),
};
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let status_clone = status;
let fork_filter_clone = fork_filter.clone();
let handle = tokio::spawn(async move {
// roughly based off of the design of tokio::net::TcpListener
let (incoming, _) = listener.accept().await.unwrap();
let stream = crate::PassthroughCodec::default().framed(incoming);
let _ = EthStream::connect(stream, status_clone, fork_filter_clone).await.unwrap();
});
let outgoing = TcpStream::connect(local_addr).await.unwrap();
let sink = crate::PassthroughCodec::default().framed(outgoing);
// try to connect
let _ = EthStream::connect(sink, status, fork_filter).await.unwrap();
// wait for it to finish
handle.await.unwrap();
}
#[tokio::test]
async fn can_write_and_read_cleartext() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let test_msg = EthMessage::NewBlockHashes(
vec![
BlockHashNumber { hash: reth_primitives::H256::random(), number: 5 },
BlockHashNumber { hash: reth_primitives::H256::random(), number: 6 },
]
.into(),
);
let test_msg_clone = test_msg.clone();
let handle = tokio::spawn(async move {
// roughly based off of the design of tokio::net::TcpListener
let (incoming, _) = listener.accept().await.unwrap();
let stream = PassthroughCodec::default().framed(incoming);
let mut stream = EthStream::new(stream);
// use the stream to get the next message
let message = stream.next().await.unwrap().unwrap();
assert_eq!(message, test_msg_clone);
});
let outgoing = TcpStream::connect(local_addr).await.unwrap();
let sink = PassthroughCodec::default().framed(outgoing);
let mut client_stream = EthStream::new(sink);
client_stream.send(test_msg).await.unwrap();
// make sure the server receives the message and asserts before ending the test
handle.await.unwrap();
}
#[tokio::test]
async fn can_write_and_read_ecies() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let server_key = SecretKey::new(&mut rand::thread_rng());
let test_msg = EthMessage::NewBlockHashes(
vec![
BlockHashNumber { hash: reth_primitives::H256::random(), number: 5 },
BlockHashNumber { hash: reth_primitives::H256::random(), number: 6 },
]
.into(),
);
let test_msg_clone = test_msg.clone();
let handle = tokio::spawn(async move {
// roughly based off of the design of tokio::net::TcpListener
let (incoming, _) = listener.accept().await.unwrap();
let stream = ECIESStream::incoming(incoming, server_key).await.unwrap();
let mut stream = EthStream::new(stream);
// use the stream to get the next message
let message = stream.next().await.unwrap().unwrap();
assert_eq!(message, test_msg_clone);
});
// create the server pubkey
let server_id = pk2id(&server_key.public_key(SECP256K1));
let client_key = SecretKey::new(&mut rand::thread_rng());
let outgoing = TcpStream::connect(local_addr).await.unwrap();
let outgoing = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap();
let mut client_stream = EthStream::new(outgoing);
client_stream.send(test_msg).await.unwrap();
// make sure the server receives the message and asserts before ending the test
handle.await.unwrap();
}
}

View File

@ -0,0 +1,31 @@
//! Types for broadcasting new data.
use reth_rlp::{RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper};
/// This informs peers of new blocks that have appeared on the network.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper)]
pub struct NewBlockHashes(
/// New block hashes and the block number for each blockhash.
/// Clients should request blocks using a [`GetBlockBodies`](crate::GetBlockBodies) message.
pub Vec<BlockHashNumber>,
);
/// A block hash _and_ a block number.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)]
pub struct BlockHashNumber {
/// The block hash
pub hash: reth_primitives::H256,
/// The block number
pub number: u64,
}
impl From<Vec<BlockHashNumber>> for NewBlockHashes {
fn from(v: Vec<BlockHashNumber>) -> Self {
NewBlockHashes(v)
}
}
impl From<NewBlockHashes> for Vec<BlockHashNumber> {
fn from(v: NewBlockHashes) -> Self {
v.0
}
}

View File

@ -0,0 +1,367 @@
#![allow(missing_docs)]
use super::{broadcast::NewBlockHashes, Status};
use bytes::Buf;
use reth_rlp::{length_of_length, Decodable, Encodable, Header};
use std::fmt::Debug;
#[derive(Clone, Debug, PartialEq, Eq)]
/// An `eth` protocol message, containing a message ID and payload.
pub struct ProtocolMessage {
pub message_type: EthMessageID,
pub message: EthMessage,
}
impl ProtocolMessage {
/// Create a new ProtocolMessage from a message type and message rlp bytes.
pub fn decode_message(
message_type: EthMessageID,
buf: &mut &[u8],
) -> Result<Self, reth_rlp::DecodeError> {
let message = match message_type {
EthMessageID::Status => EthMessage::Status(Status::decode(buf)?),
EthMessageID::NewBlockHashes => {
EthMessage::NewBlockHashes(NewBlockHashes::decode(buf)?)
}
_ => unimplemented!(),
// EthMessageID::NewBlock => EthMessage::NewBlock(Box::new(NewBlock::decode(buf)?)),
// EthMessageID::Transactions => EthMessage::Transactions(Transactions::decode(buf)?),
// EthMessageID::NewPooledTransactionHashes => {
// EthMessage::NewPooledTransactionHashes(NewPooledTransactionHashes::decode(buf)?)
// }
// EthMessageID::GetBlockHeaders => {
// let request_pair = RequestPair::<GetBlockHeaders>::decode(buf)?;
// EthMessage::GetBlockHeaders(request_pair)
// }
// EthMessageID::BlockHeaders => {
// let request_pair = RequestPair::<BlockHeaders>::decode(buf)?;
// EthMessage::BlockHeaders(request_pair)
// }
// EthMessageID::GetBlockBodies => {
// let request_pair = RequestPair::<GetBlockBodies>::decode(buf)?;
// EthMessage::GetBlockBodies(request_pair)
// }
// EthMessageID::BlockBodies => {
// let request_pair = RequestPair::<BlockBodies>::decode(buf)?;
// EthMessage::BlockBodies(request_pair)
// }
// EthMessageID::GetPooledTransactions => {
// let request_pair = RequestPair::<GetPooledTransactions>::decode(buf)?;
// EthMessage::GetPooledTransactions(request_pair)
// }
// EthMessageID::PooledTransactions => {
// let request_pair = RequestPair::<PooledTransactions>::decode(buf)?;
// EthMessage::PooledTransactions(request_pair)
// }
// EthMessageID::GetNodeData => {
// let request_pair = RequestPair::<GetNodeData>::decode(buf)?;
// EthMessage::GetNodeData(request_pair)
// }
// EthMessageID::NodeData => {
// let request_pair = RequestPair::<NodeData>::decode(buf)?;
// EthMessage::NodeData(request_pair)
// }
// EthMessageID::GetReceipts => {
// let request_pair = RequestPair::<GetReceipts>::decode(buf)?;
// EthMessage::GetReceipts(request_pair)
// }
// EthMessageID::Receipts => {
// let request_pair = RequestPair::<Receipts>::decode(buf)?;
// EthMessage::Receipts(request_pair)
// }
};
Ok(ProtocolMessage { message_type, message })
}
}
/// Encodes the protocol message into bytes.
/// The message type is encoded as a single byte and prepended to the message.
impl Encodable for ProtocolMessage {
fn length(&self) -> usize {
self.message_type.length() + self.message.length()
}
fn encode(&self, out: &mut dyn bytes::BufMut) {
self.message_type.encode(out);
self.message.encode(out);
}
}
/// Decodes a protocol message from bytes, using the first byte to determine the message type.
/// This decodes `eth/66` request ids for each message type.
impl Decodable for ProtocolMessage {
fn decode(buf: &mut &[u8]) -> Result<Self, reth_rlp::DecodeError> {
let message_type = EthMessageID::decode(buf)?;
Self::decode_message(message_type, buf)
}
}
impl From<EthMessage> for ProtocolMessage {
fn from(message: EthMessage) -> Self {
ProtocolMessage { message_type: message.message_id(), message }
}
}
// TODO: determine whats up with this enum variant size warning
/// Represents a message in the eth wire protocol, versions 66 and 67.
///
/// The ethereum wire protocol is a set of messages that are broadcasted to the network in two
/// styles:
/// * A request message sent by a peer (such as [`GetPooledTransactions`]), and an associated
/// response message (such as [`PooledTransactions`]).
/// * A message that is broadcast to the network, without a corresponding request.
///
/// The newer `eth/66` is an efficiency upgrade on top of `eth/65`, introducing a request id to
/// correlate request-response message pairs. This allows for request multiplexing.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EthMessage {
// Status is required for the protocol handshake
Status(Status),
// // The following messages are broadcast to the network
NewBlockHashes(NewBlockHashes),
// NewBlock(Box<NewBlock>),
// Transactions(Transactions),
// NewPooledTransactionHashes(NewPooledTransactionHashes),
// // The following messages are request-response message pairs
// GetBlockHeaders(RequestPair<GetBlockHeaders>),
// BlockHeaders(RequestPair<BlockHeaders>),
// GetBlockBodies(RequestPair<GetBlockBodies>),
// BlockBodies(RequestPair<BlockBodies>),
// GetPooledTransactions(RequestPair<GetPooledTransactions>),
// PooledTransactions(RequestPair<PooledTransactions>),
// GetNodeData(RequestPair<GetNodeData>),
// NodeData(RequestPair<NodeData>),
// GetReceipts(RequestPair<GetReceipts>),
// Receipts(RequestPair<Receipts>),
}
impl EthMessage {
/// Returns the message's ID.
pub fn message_id(&self) -> EthMessageID {
match self {
EthMessage::Status(_) => EthMessageID::Status,
EthMessage::NewBlockHashes(_) => EthMessageID::NewBlockHashes,
// EthMessage::NewBlock(_) => EthMessageID::NewBlock,
// EthMessage::Transactions(_) => EthMessageID::Transactions,
// EthMessage::NewPooledTransactionHashes(_) =>
// EthMessageID::NewPooledTransactionHashes, EthMessage::GetBlockHeaders(_)
// => EthMessageID::GetBlockHeaders, EthMessage::BlockHeaders(_) =>
// EthMessageID::BlockHeaders, EthMessage::GetBlockBodies(_) =>
// EthMessageID::GetBlockBodies, EthMessage::BlockBodies(_) =>
// EthMessageID::BlockBodies, EthMessage::GetPooledTransactions(_) =>
// EthMessageID::GetPooledTransactions, EthMessage::PooledTransactions(_) =>
// EthMessageID::PooledTransactions, EthMessage::GetNodeData(_) =>
// EthMessageID::GetNodeData, EthMessage::NodeData(_) =>
// EthMessageID::NodeData, EthMessage::GetReceipts(_) =>
// EthMessageID::GetReceipts, EthMessage::Receipts(_) =>
// EthMessageID::Receipts,
}
}
}
impl Encodable for EthMessage {
fn length(&self) -> usize {
match self {
EthMessage::Status(status) => status.length(),
EthMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.length(),
// EthMessage::NewBlock(new_block) => new_block.length(),
// EthMessage::Transactions(transactions) => transactions.length(),
// EthMessage::NewPooledTransactionHashes(hashes) => hashes.length(),
// EthMessage::GetBlockHeaders(request) => request.length(),
// EthMessage::BlockHeaders(headers) => headers.length(),
// EthMessage::GetBlockBodies(request) => request.length(),
// EthMessage::BlockBodies(bodies) => bodies.length(),
// EthMessage::GetPooledTransactions(request) => request.length(),
// EthMessage::PooledTransactions(transactions) => transactions.length(),
// EthMessage::GetNodeData(request) => request.length(),
// EthMessage::NodeData(data) => data.length(),
// EthMessage::GetReceipts(request) => request.length(),
// EthMessage::Receipts(receipts) => receipts.length(),
}
}
fn encode(&self, out: &mut dyn bytes::BufMut) {
match self {
EthMessage::Status(status) => status.encode(out),
EthMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out),
// EthMessage::NewBlock(new_block) => new_block.encode(out),
// EthMessage::Transactions(transactions) => transactions.encode(out),
// EthMessage::NewPooledTransactionHashes(hashes) => hashes.encode(out),
// EthMessage::GetBlockHeaders(request) => request.encode(out),
// EthMessage::BlockHeaders(headers) => headers.encode(out),
// EthMessage::GetBlockBodies(request) => request.encode(out),
// EthMessage::BlockBodies(bodies) => bodies.encode(out),
// EthMessage::GetPooledTransactions(request) => request.encode(out),
// EthMessage::PooledTransactions(transactions) => transactions.encode(out),
// EthMessage::GetNodeData(request) => request.encode(out),
// EthMessage::NodeData(data) => data.encode(out),
// EthMessage::GetReceipts(request) => request.encode(out),
// EthMessage::Receipts(receipts) => receipts.encode(out),
}
}
}
/// Represents message IDs for eth protocol messages.
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum EthMessageID {
Status = 0x00,
NewBlockHashes = 0x01,
Transactions = 0x02,
GetBlockHeaders = 0x03,
BlockHeaders = 0x04,
GetBlockBodies = 0x05,
BlockBodies = 0x06,
NewBlock = 0x07,
NewPooledTransactionHashes = 0x08,
GetPooledTransactions = 0x09,
PooledTransactions = 0x0a,
GetNodeData = 0x0d,
NodeData = 0x0e,
GetReceipts = 0x0f,
Receipts = 0x10,
}
impl Encodable for EthMessageID {
fn length(&self) -> usize {
1
}
fn encode(&self, out: &mut dyn bytes::BufMut) {
out.put_u8(*self as u8);
}
}
impl Decodable for EthMessageID {
fn decode(buf: &mut &[u8]) -> Result<Self, reth_rlp::DecodeError> {
let id = buf.first().ok_or(reth_rlp::DecodeError::InputTooShort)?;
let id = match id {
0x00 => EthMessageID::Status,
0x01 => EthMessageID::NewBlockHashes,
0x02 => EthMessageID::Transactions,
0x03 => EthMessageID::GetBlockHeaders,
0x04 => EthMessageID::BlockHeaders,
0x05 => EthMessageID::GetBlockBodies,
0x06 => EthMessageID::BlockBodies,
0x07 => EthMessageID::NewBlock,
0x08 => EthMessageID::NewPooledTransactionHashes,
0x09 => EthMessageID::GetPooledTransactions,
0x0a => EthMessageID::PooledTransactions,
0x0d => EthMessageID::GetNodeData,
0x0e => EthMessageID::NodeData,
0x0f => EthMessageID::GetReceipts,
0x10 => EthMessageID::Receipts,
_ => return Err(reth_rlp::DecodeError::Custom("Invalid message ID")),
};
buf.advance(1);
Ok(id)
}
}
impl TryFrom<usize> for EthMessageID {
type Error = &'static str;
fn try_from(value: usize) -> Result<Self, Self::Error> {
match value {
0x00 => Ok(EthMessageID::Status),
0x01 => Ok(EthMessageID::NewBlockHashes),
0x02 => Ok(EthMessageID::Transactions),
0x03 => Ok(EthMessageID::GetBlockHeaders),
0x04 => Ok(EthMessageID::BlockHeaders),
0x05 => Ok(EthMessageID::GetBlockBodies),
0x06 => Ok(EthMessageID::BlockBodies),
0x07 => Ok(EthMessageID::NewBlock),
0x08 => Ok(EthMessageID::NewPooledTransactionHashes),
0x09 => Ok(EthMessageID::GetPooledTransactions),
0x0a => Ok(EthMessageID::PooledTransactions),
0x0d => Ok(EthMessageID::GetNodeData),
0x0e => Ok(EthMessageID::NodeData),
0x0f => Ok(EthMessageID::GetReceipts),
0x10 => Ok(EthMessageID::Receipts),
_ => Err("Invalid message ID"),
}
}
}
/// This is used for all request-response style `eth` protocol messages.
/// This can represent either a request or a response, since both include a message payload and
/// request id.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RequestPair<T> {
/// id for the contained request or response message
pub request_id: u64,
/// the request or response message payload
pub message: T,
}
/// Allows messages with request ids to be serialized into RLP bytes.
impl<T> Encodable for RequestPair<T>
where
T: Encodable,
{
fn length(&self) -> usize {
let mut length = 0;
length += self.request_id.length();
length += self.message.length();
length += length_of_length(length);
length
}
fn encode(&self, out: &mut dyn reth_rlp::BufMut) {
let header =
Header { list: true, payload_length: self.request_id.length() + self.message.length() };
header.encode(out);
self.request_id.encode(out);
self.message.encode(out);
}
}
/// Allows messages with request ids to be deserialized into RLP bytes.
impl<T> Decodable for RequestPair<T>
where
T: Decodable,
{
fn decode(buf: &mut &[u8]) -> Result<Self, reth_rlp::DecodeError> {
let _header = Header::decode(buf)?;
Ok(Self { request_id: u64::decode(buf)?, message: T::decode(buf)? })
}
}
#[cfg(test)]
mod test {
use crate::types::message::RequestPair;
use hex_literal::hex;
use reth_rlp::{Decodable, Encodable};
fn encode<T: Encodable>(value: T) -> Vec<u8> {
let mut buf = vec![];
value.encode(&mut buf);
buf
}
#[test]
fn request_pair_encode() {
let request_pair = RequestPair { request_id: 1337, message: vec![5u8] };
// c5: start of list (c0) + len(full_list) (length is <55 bytes)
// 82: 0x80 + len(1337)
// 05 39: 1337 (request_id)
// === full_list ===
// c1: start of list (c0) + len(list) (length is <55 bytes)
// 05: 5 (message)
let expected = hex!("c5820539c105");
let got = encode(request_pair);
assert_eq!(expected[..], got, "expected: {:X?}, got: {:X?}", expected, got,);
}
#[test]
fn request_pair_decode() {
let raw_pair = &hex!("c5820539c105")[..];
let expected = RequestPair { request_id: 1337, message: vec![5u8] };
let got = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair).unwrap();
assert_eq!(expected.length(), raw_pair.len());
assert_eq!(expected, got);
}
}

View File

@ -0,0 +1,14 @@
//! Types for the eth wire protocol.
mod status;
pub use status::Status;
mod version;
pub use version::EthVersion;
pub mod forkid;
pub mod message;
pub use message::{EthMessage, EthMessageID, ProtocolMessage};
pub mod broadcast;

View File

@ -1,4 +1,4 @@
use crate::forkid::ForkId;
use super::forkid::ForkId;
use reth_primitives::{Chain, H256, U256};
use reth_rlp::{RlpDecodable, RlpEncodable};
use std::fmt::{Debug, Display};
@ -92,7 +92,7 @@ mod tests {
use reth_rlp::{Decodable, Encodable};
use std::str::FromStr;
use crate::{
use crate::types::{
forkid::{ForkHash, ForkId},
EthVersion, Status,
};

View File

@ -20,7 +20,7 @@ pub enum EthVersion {
///
/// # Example
/// ```
/// use reth_eth_wire::EthVersion;
/// use reth_eth_wire::types::EthVersion;
///
/// let version = EthVersion::try_from("67").unwrap();
/// assert_eq!(version, EthVersion::Eth67);
@ -42,7 +42,7 @@ impl TryFrom<&str> for EthVersion {
///
/// # Example
/// ```
/// use reth_eth_wire::EthVersion;
/// use reth_eth_wire::types::EthVersion;
///
/// let version = EthVersion::try_from(67).unwrap();
/// assert_eq!(version, EthVersion::Eth67);