From 8009d997c0180aa459b0ce090a9654bb33a6ca80 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Sun, 16 Oct 2022 20:10:25 -0700 Subject: [PATCH] Implement ETH P2P (#81) * refactor: move things to types * feat(ethwire): bring in message type from ethp2p https://github.com/Rjected/ethp2p/blob/30c11138d52b8b695f9abc0ae8dab2d33600bc1c/src/message.rs * feat(ethwire): add eth-stream with Stream/Sink impls * feat(ethwire): make Sink error an EthStreamError * feat(ecies): expose util module * add more deps * feat: add broadcast newblockhashes * fix: rlp serialize with message-id first * chore: cleanup doc * wip: test eth connection * bump cargo lock * feat: add rlp codec and get stream tests working * fix: convert RlpCodec to PassthroughCodec we were rlp encoding twice * Revert "fix: convert RlpCodec to PassthroughCodec" This reverts commit 6e6e0a58112c14d7ffba62dc83f9747ddc280641. This does not handle framing, which would fail decoding if a partial write happened to the socket. * add tracing * refactor: add handshake error * feat(ethwire): add status handshake * test(ethwire): handshake works * refactor: expose EthStream::new * chore: clippy lints * fix: get rid of rlp codec we can instead use LengthLimitedCodec from Tokio IO, which we re-export as PassthroughCodec * fix(eth): add handshake + msg error checks 1. 10MB message lengths 2. Same Genesis, Version, Chain on Status Handshake * chore: ignore result_large_err Co-authored-by: Matthias Seitz --- Cargo.lock | 9 + crates/common/rlp/src/encode.rs | 8 + crates/common/rlp/src/types.rs | 2 +- crates/net/ecies/src/codec.rs | 5 +- crates/net/ecies/src/error.rs | 3 +- crates/net/ecies/src/lib.rs | 5 +- crates/net/ecies/src/stream.rs | 8 +- crates/net/ecies/src/util.rs | 4 +- crates/net/eth-wire/Cargo.toml | 12 +- crates/net/eth-wire/src/error.rs | 39 ++ crates/net/eth-wire/src/lib.rs | 16 +- crates/net/eth-wire/src/stream.rs | 301 ++++++++++++++ crates/net/eth-wire/src/types/broadcast.rs | 31 ++ crates/net/eth-wire/src/{ => types}/forkid.rs | 0 crates/net/eth-wire/src/types/message.rs | 367 ++++++++++++++++++ crates/net/eth-wire/src/types/mod.rs | 14 + crates/net/eth-wire/src/{ => types}/status.rs | 4 +- .../net/eth-wire/src/{ => types}/version.rs | 4 +- 18 files changed, 805 insertions(+), 27 deletions(-) create mode 100644 crates/net/eth-wire/src/error.rs create mode 100644 crates/net/eth-wire/src/stream.rs create mode 100644 crates/net/eth-wire/src/types/broadcast.rs rename crates/net/eth-wire/src/{ => types}/forkid.rs (100%) create mode 100644 crates/net/eth-wire/src/types/message.rs create mode 100644 crates/net/eth-wire/src/types/mod.rs rename crates/net/eth-wire/src/{ => types}/status.rs (99%) rename crates/net/eth-wire/src/{ => types}/version.rs (96%) diff --git a/Cargo.lock b/Cargo.lock index c8f5aeaac..59655e448 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2199,12 +2199,21 @@ dependencies = [ "bytes", "crc", "ethers-core", + "futures", "hex", "hex-literal", "maplit", + "pin-project", + "rand", + "reth-ecies", "reth-primitives", "reth-rlp", + "secp256k1", "thiserror", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", ] [[package]] diff --git a/crates/common/rlp/src/encode.rs b/crates/common/rlp/src/encode.rs index 8b3017d83..89cf191e0 100644 --- a/crates/common/rlp/src/encode.rs +++ b/crates/common/rlp/src/encode.rs @@ -10,6 +10,7 @@ fn zeroless_view(v: &impl AsRef<[u8]>) -> &[u8] { } impl Header { + /// Encodes the header into the `out` buffer. pub fn encode(&self, out: &mut dyn BufMut) { if self.payload_length < 56 { let code = if self.list { EMPTY_LIST_CODE } else { EMPTY_STRING_CODE }; @@ -22,6 +23,13 @@ impl Header { out.put_slice(len_be); } } + + /// Returns the length of the encoded header + pub fn length(&self) -> usize { + let mut out = BytesMut::new(); + self.encode(&mut out); + out.len() + } } pub const fn length_of_length(payload_length: usize) -> usize { diff --git a/crates/common/rlp/src/types.rs b/crates/common/rlp/src/types.rs index 333e358e6..4cbc6f10f 100644 --- a/crates/common/rlp/src/types.rs +++ b/crates/common/rlp/src/types.rs @@ -1,4 +1,4 @@ -#[derive(Clone, Default, PartialEq)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct Header { pub list: bool, pub payload_length: usize, diff --git a/crates/net/ecies/src/codec.rs b/crates/net/ecies/src/codec.rs index 68c23e051..3765b8f1b 100644 --- a/crates/net/ecies/src/codec.rs +++ b/crates/net/ecies/src/codec.rs @@ -2,7 +2,7 @@ use crate::{ algorithm::{ECIES, MAX_BODY_SIZE}, ECIESError, EgressECIESValue, IngressECIESValue, }; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use reth_primitives::H512 as PeerId; use secp256k1::SecretKey; use std::{fmt::Debug, io}; @@ -105,7 +105,8 @@ impl Decoder for ECIESCodec { } let mut data = buf.split_to(self.ecies.body_len()); - let ret = Bytes::copy_from_slice(self.ecies.read_body(&mut data)?); + let mut ret = BytesMut::new(); + ret.extend_from_slice(self.ecies.read_body(&mut data)?); self.state = ECIESState::Header; return Ok(Some(IngressECIESValue::Message(ret))) diff --git a/crates/net/ecies/src/error.rs b/crates/net/ecies/src/error.rs index ef25040d2..45fd52544 100644 --- a/crates/net/ecies/src/error.rs +++ b/crates/net/ecies/src/error.rs @@ -1,6 +1,5 @@ -use thiserror::Error; - use crate::IngressECIESValue; +use thiserror::Error; /// An error that occurs while reading or writing to an ECIES stream. #[derive(Debug, Error)] diff --git a/crates/net/ecies/src/lib.rs b/crates/net/ecies/src/lib.rs index 268ece82d..ab53207c4 100644 --- a/crates/net/ecies/src/lib.rs +++ b/crates/net/ecies/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(clippy::result_large_err)] #![warn(missing_docs, unreachable_pub)] #![deny(unused_must_use, rust_2018_idioms)] #![doc(test( @@ -10,7 +11,7 @@ pub mod algorithm; pub mod mac; pub mod stream; -mod util; +pub mod util; mod error; pub use error::ECIESError; @@ -38,5 +39,5 @@ pub enum IngressECIESValue { /// Receiving an ACK message Ack, /// Receiving a message - Message(bytes::Bytes), + Message(bytes::BytesMut), } diff --git a/crates/net/ecies/src/stream.rs b/crates/net/ecies/src/stream.rs index 41897f066..10aa457a3 100644 --- a/crates/net/ecies/src/stream.rs +++ b/crates/net/ecies/src/stream.rs @@ -1,5 +1,5 @@ //! The ECIES Stream implementation which wraps over [`AsyncRead`] and [`AsyncWrite`]. -use crate::{ECIESError, EgressECIESValue, IngressECIESValue}; +use crate::{codec::ECIESCodec, ECIESError, EgressECIESValue, IngressECIESValue}; use bytes::Bytes; use futures::{ready, Sink, SinkExt}; use reth_primitives::H512 as PeerId; @@ -19,8 +19,6 @@ use tokio_stream::{Stream, StreamExt}; use tokio_util::codec::{Decoder, Framed}; use tracing::{debug, instrument, trace}; -use crate::codec::ECIESCodec; - /// `ECIES` stream over TCP exchanging raw bytes #[derive(Debug)] pub struct ECIESStream { @@ -106,7 +104,7 @@ impl Stream for ECIESStream where Io: AsyncRead + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match ready!(Pin::new(&mut self.get_mut().stream).poll_next(cx)) { @@ -156,8 +154,6 @@ mod tests { use super::*; #[tokio::test] - // TODO: implement test for the proposed - // API: https://github.com/foundry-rs/reth/issues/64#issue-1408708420 async fn can_write_and_read() { let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); let server_key = SecretKey::new(&mut rand::thread_rng()); diff --git a/crates/net/ecies/src/util.rs b/crates/net/ecies/src/util.rs index bd5ee5d97..ea4891d7f 100644 --- a/crates/net/ecies/src/util.rs +++ b/crates/net/ecies/src/util.rs @@ -1,3 +1,5 @@ +//! Utility functions for hashing and encoding. + use hmac::{Hmac, Mac}; use reth_primitives::{H256, H512 as PeerId}; use secp256k1::PublicKey; @@ -22,7 +24,7 @@ pub(crate) fn hmac_sha256(key: &[u8], input: &[&[u8]], auth_data: &[u8]) -> H256 /// Converts a [secp256k1::PublicKey] to a [PeerId] by stripping the /// SECP256K1_TAG_PUBKEY_UNCOMPRESSED tag and storing the rest of the slice in the [PeerId]. -pub(crate) fn pk2id(pk: &PublicKey) -> PeerId { +pub fn pk2id(pk: &PublicKey) -> PeerId { PeerId::from_slice(&pk.serialize_uncompressed()[1..]) } diff --git a/crates/net/eth-wire/Cargo.toml b/crates/net/eth-wire/Cargo.toml index 0d52dd072..1a358ec8c 100644 --- a/crates/net/eth-wire/Cargo.toml +++ b/crates/net/eth-wire/Cargo.toml @@ -1,5 +1,6 @@ [package] name = "reth-eth-wire" +description = "Impements the eth/64 and eth/65 P2P protocols" version = "0.1.0" edition = "2021" license = "MIT OR Apache-2.0" @@ -12,13 +13,22 @@ hex = "0.4" thiserror = "1" # reth +reth-ecies = { path = "../ecies" } reth-primitives = { path = "../../primitives" } reth-rlp = { path = "../../common/rlp", features = ["alloc", "derive", "std", "ethereum-types"] } #used for forkid crc = "1" maplit = "1" +tokio = { version = "1.21.2", features = ["full"] } +futures = "0.3.24" +tokio-stream = "0.1.11" +secp256k1 = { version = "0.24.0", features = ["global-context", "rand-std", "recovery"] } +tokio-util = { version = "0.7.4", features = ["io"] } +pin-project = "1.0" +tracing = "0.1.37" [dev-dependencies] hex-literal = "0.3" -ethers-core = { git = "https://github.com/gakonst/ethers-rs", default-features = false } \ No newline at end of file +ethers-core = { git = "https://github.com/gakonst/ethers-rs", default-features = false } +rand = "0.8" diff --git a/crates/net/eth-wire/src/error.rs b/crates/net/eth-wire/src/error.rs new file mode 100644 index 000000000..0ee68fc19 --- /dev/null +++ b/crates/net/eth-wire/src/error.rs @@ -0,0 +1,39 @@ +//! Error cases when handling a [`crate::EthStream`] +use std::io; + +use reth_primitives::{Chain, H256}; + +use crate::types::forkid::ValidationError; + +/// Errors when sending/receiving messages +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub enum EthStreamError { + #[error(transparent)] + Io(#[from] io::Error), + #[error(transparent)] + Rlp(#[from] reth_rlp::DecodeError), + #[error(transparent)] + HandshakeError(#[from] HandshakeError), + #[error("message size ({0}) exceeds max length (10MB)")] + MessageTooBig(usize), +} + +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub enum HandshakeError { + #[error("status message can only be recv/sent in handshake")] + StatusNotInHandshake, + #[error("received non-status message when trying to handshake")] + NonStatusMessageInHandshake, + #[error("no response received when sending out handshake")] + NoResponse, + #[error(transparent)] + InvalidFork(#[from] ValidationError), + #[error("mismatched genesis in Status message. expected: {expected:?}, got: {got:?}")] + MismatchedGenesis { expected: H256, got: H256 }, + #[error("mismatched protocol version in Status message. expected: {expected:?}, got: {got:?}")] + MismatchedProtocolVersion { expected: u8, got: u8 }, + #[error("mismatched chain in Status message. expected: {expected:?}, got: {got:?}")] + MismatchedChain { expected: Chain, got: Chain }, +} diff --git a/crates/net/eth-wire/src/lib.rs b/crates/net/eth-wire/src/lib.rs index 8751c2a94..6d9221d16 100644 --- a/crates/net/eth-wire/src/lib.rs +++ b/crates/net/eth-wire/src/lib.rs @@ -4,13 +4,13 @@ no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] +//! Implementation of the `eth` wire protocol. -//! Types for the eth wire protocol. +pub use tokio_util::codec::{ + LengthDelimitedCodec as PassthroughCodec, LengthDelimitedCodecError as PassthroughCodecError, +}; +pub mod error; +mod stream; +pub mod types; -mod status; -pub use status::Status; - -mod version; -pub use version::EthVersion; - -pub mod forkid; +pub use stream::EthStream; diff --git a/crates/net/eth-wire/src/stream.rs b/crates/net/eth-wire/src/stream.rs new file mode 100644 index 000000000..95407078a --- /dev/null +++ b/crates/net/eth-wire/src/stream.rs @@ -0,0 +1,301 @@ +use crate::{ + error::{EthStreamError, HandshakeError}, + types::{forkid::ForkFilter, EthMessage, ProtocolMessage, Status}, +}; +use bytes::BytesMut; +use futures::{ready, Sink, SinkExt, StreamExt}; +use pin_project::pin_project; +use reth_rlp::{Decodable, Encodable}; +use std::{ + io, + pin::Pin, + task::{Context, Poll}, +}; +use tokio_stream::Stream; + +/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message. +// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50 +const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024; + +/// An `EthStream` wraps over any `Stream` that yields bytes and makes it +/// compatible with eth-networking protocol messages, which get RLP encoded/decoded. +#[pin_project] +pub struct EthStream { + #[pin] + inner: S, + /// Whether the `Status` handshake has been completed + authed: bool, +} + +impl EthStream { + /// Creates a new unauthed [`EthStream`] from a provided stream. You will need + /// to manually handshake a peer. + pub fn new(inner: S) -> Self { + Self { inner, authed: false } + } +} + +impl EthStream +where + S: Stream> + + Sink + + Unpin, +{ + /// Given an instantiated transport layer, it proceeds to return an [`EthStream`] + /// after performing a [`Status`] message handshake as specified in + pub async fn connect( + inner: S, + status: Status, + fork_filter: ForkFilter, + ) -> Result { + let mut this = Self::new(inner); + this.handshake(status, fork_filter).await?; + Ok(this) + } + + /// Performs a handshake with the connected peer over the transport stream. + pub async fn handshake( + &mut self, + status: Status, + fork_filter: ForkFilter, + ) -> Result<(), EthStreamError> { + tracing::trace!("sending eth status ..."); + self.send(EthMessage::Status(status)).await?; + + tracing::trace!("waiting for eth status from peer ..."); + let msg = self + .next() + .await + .ok_or_else(|| EthStreamError::HandshakeError(HandshakeError::NoResponse))??; + + // TODO: Add any missing checks + // https://github.com/ethereum/go-ethereum/blob/9244d5cd61f3ea5a7645fdf2a1a96d53421e412f/eth/protocols/eth/handshake.go#L87-L89 + match msg { + EthMessage::Status(resp) => { + self.authed = true; + + if status.genesis != resp.genesis { + return Err(HandshakeError::MismatchedGenesis { + expected: status.genesis, + got: resp.genesis, + } + .into()) + } + + if status.version != resp.version { + return Err(HandshakeError::MismatchedProtocolVersion { + expected: status.version, + got: resp.version, + } + .into()) + } + + if status.chain != resp.chain { + return Err(HandshakeError::MismatchedChain { + expected: status.chain, + got: resp.chain, + } + .into()) + } + + Ok(fork_filter.validate(resp.forkid).map_err(HandshakeError::InvalidFork)?) + } + _ => Err(EthStreamError::HandshakeError(HandshakeError::NonStatusMessageInHandshake)), + } + } +} + +impl Stream for EthStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let res = ready!(this.inner.poll_next(cx)); + let bytes = match res { + Some(Ok(bytes)) => bytes, + Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))), + None => return Poll::Ready(None), + }; + + if bytes.len() > MAX_MESSAGE_SIZE { + return Poll::Ready(Some(Err(EthStreamError::MessageTooBig(bytes.len())))) + } + + let msg = match ProtocolMessage::decode(&mut bytes.as_ref()) { + Ok(m) => m, + Err(err) => return Poll::Ready(Some(Err(err.into()))), + }; + + if *this.authed && matches!(msg.message, EthMessage::Status(_)) { + return Poll::Ready(Some(Err(EthStreamError::HandshakeError( + HandshakeError::StatusNotInHandshake, + )))) + } + + Poll::Ready(Some(Ok(msg.message))) + } +} + +impl Sink for EthStream +where + S: Sink + Unpin, +{ + type Error = EthStreamError; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_ready(cx).map_err(Into::into) + } + + fn start_send(self: Pin<&mut Self>, item: EthMessage) -> Result<(), Self::Error> { + if self.authed && matches!(item, EthMessage::Status(_)) { + return Err(EthStreamError::HandshakeError(HandshakeError::StatusNotInHandshake)) + } + + let mut bytes = BytesMut::new(); + ProtocolMessage::from(item).encode(&mut bytes); + let bytes = bytes.freeze(); + + self.project().inner.start_send(bytes)?; + + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx).map_err(Into::into) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx).map_err(Into::into) + } +} + +#[cfg(test)] +mod tests { + use crate::{ + types::{broadcast::BlockHashNumber, forkid::ForkFilter, EthMessage, Status}, + EthStream, PassthroughCodec, + }; + use futures::{SinkExt, StreamExt}; + use reth_ecies::{stream::ECIESStream, util::pk2id}; + use secp256k1::{SecretKey, SECP256K1}; + use tokio::net::{TcpListener, TcpStream}; + use tokio_util::codec::Decoder; + + use crate::types::EthVersion; + use ethers_core::types::Chain; + use reth_primitives::{H256, U256}; + + #[tokio::test] + async fn can_handshake() { + let genesis = H256::random(); + let fork_filter = ForkFilter::new(0, genesis, vec![]); + + let status = Status { + version: EthVersion::Eth67 as u8, + chain: Chain::Mainnet.into(), + total_difficulty: U256::from(0), + blockhash: H256::random(), + genesis, + // Pass the current fork id. + forkid: fork_filter.current(), + }; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let local_addr = listener.local_addr().unwrap(); + + let status_clone = status; + let fork_filter_clone = fork_filter.clone(); + let handle = tokio::spawn(async move { + // roughly based off of the design of tokio::net::TcpListener + let (incoming, _) = listener.accept().await.unwrap(); + let stream = crate::PassthroughCodec::default().framed(incoming); + let _ = EthStream::connect(stream, status_clone, fork_filter_clone).await.unwrap(); + }); + + let outgoing = TcpStream::connect(local_addr).await.unwrap(); + let sink = crate::PassthroughCodec::default().framed(outgoing); + + // try to connect + let _ = EthStream::connect(sink, status, fork_filter).await.unwrap(); + + // wait for it to finish + handle.await.unwrap(); + } + + #[tokio::test] + async fn can_write_and_read_cleartext() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let local_addr = listener.local_addr().unwrap(); + let test_msg = EthMessage::NewBlockHashes( + vec![ + BlockHashNumber { hash: reth_primitives::H256::random(), number: 5 }, + BlockHashNumber { hash: reth_primitives::H256::random(), number: 6 }, + ] + .into(), + ); + + let test_msg_clone = test_msg.clone(); + let handle = tokio::spawn(async move { + // roughly based off of the design of tokio::net::TcpListener + let (incoming, _) = listener.accept().await.unwrap(); + let stream = PassthroughCodec::default().framed(incoming); + let mut stream = EthStream::new(stream); + + // use the stream to get the next message + let message = stream.next().await.unwrap().unwrap(); + assert_eq!(message, test_msg_clone); + }); + + let outgoing = TcpStream::connect(local_addr).await.unwrap(); + let sink = PassthroughCodec::default().framed(outgoing); + let mut client_stream = EthStream::new(sink); + + client_stream.send(test_msg).await.unwrap(); + + // make sure the server receives the message and asserts before ending the test + handle.await.unwrap(); + } + + #[tokio::test] + async fn can_write_and_read_ecies() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let local_addr = listener.local_addr().unwrap(); + let server_key = SecretKey::new(&mut rand::thread_rng()); + let test_msg = EthMessage::NewBlockHashes( + vec![ + BlockHashNumber { hash: reth_primitives::H256::random(), number: 5 }, + BlockHashNumber { hash: reth_primitives::H256::random(), number: 6 }, + ] + .into(), + ); + + let test_msg_clone = test_msg.clone(); + let handle = tokio::spawn(async move { + // roughly based off of the design of tokio::net::TcpListener + let (incoming, _) = listener.accept().await.unwrap(); + let stream = ECIESStream::incoming(incoming, server_key).await.unwrap(); + let mut stream = EthStream::new(stream); + + // use the stream to get the next message + let message = stream.next().await.unwrap().unwrap(); + assert_eq!(message, test_msg_clone); + }); + + // create the server pubkey + let server_id = pk2id(&server_key.public_key(SECP256K1)); + + let client_key = SecretKey::new(&mut rand::thread_rng()); + + let outgoing = TcpStream::connect(local_addr).await.unwrap(); + let outgoing = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap(); + let mut client_stream = EthStream::new(outgoing); + + client_stream.send(test_msg).await.unwrap(); + + // make sure the server receives the message and asserts before ending the test + handle.await.unwrap(); + } +} diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs new file mode 100644 index 000000000..09e4f284e --- /dev/null +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -0,0 +1,31 @@ +//! Types for broadcasting new data. +use reth_rlp::{RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper}; + +/// This informs peers of new blocks that have appeared on the network. +#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper)] +pub struct NewBlockHashes( + /// New block hashes and the block number for each blockhash. + /// Clients should request blocks using a [`GetBlockBodies`](crate::GetBlockBodies) message. + pub Vec, +); + +/// A block hash _and_ a block number. +#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)] +pub struct BlockHashNumber { + /// The block hash + pub hash: reth_primitives::H256, + /// The block number + pub number: u64, +} + +impl From> for NewBlockHashes { + fn from(v: Vec) -> Self { + NewBlockHashes(v) + } +} + +impl From for Vec { + fn from(v: NewBlockHashes) -> Self { + v.0 + } +} diff --git a/crates/net/eth-wire/src/forkid.rs b/crates/net/eth-wire/src/types/forkid.rs similarity index 100% rename from crates/net/eth-wire/src/forkid.rs rename to crates/net/eth-wire/src/types/forkid.rs diff --git a/crates/net/eth-wire/src/types/message.rs b/crates/net/eth-wire/src/types/message.rs new file mode 100644 index 000000000..3249cc2fd --- /dev/null +++ b/crates/net/eth-wire/src/types/message.rs @@ -0,0 +1,367 @@ +#![allow(missing_docs)] +use super::{broadcast::NewBlockHashes, Status}; +use bytes::Buf; +use reth_rlp::{length_of_length, Decodable, Encodable, Header}; +use std::fmt::Debug; + +#[derive(Clone, Debug, PartialEq, Eq)] +/// An `eth` protocol message, containing a message ID and payload. +pub struct ProtocolMessage { + pub message_type: EthMessageID, + pub message: EthMessage, +} + +impl ProtocolMessage { + /// Create a new ProtocolMessage from a message type and message rlp bytes. + pub fn decode_message( + message_type: EthMessageID, + buf: &mut &[u8], + ) -> Result { + let message = match message_type { + EthMessageID::Status => EthMessage::Status(Status::decode(buf)?), + EthMessageID::NewBlockHashes => { + EthMessage::NewBlockHashes(NewBlockHashes::decode(buf)?) + } + _ => unimplemented!(), + // EthMessageID::NewBlock => EthMessage::NewBlock(Box::new(NewBlock::decode(buf)?)), + // EthMessageID::Transactions => EthMessage::Transactions(Transactions::decode(buf)?), + // EthMessageID::NewPooledTransactionHashes => { + // EthMessage::NewPooledTransactionHashes(NewPooledTransactionHashes::decode(buf)?) + // } + // EthMessageID::GetBlockHeaders => { + // let request_pair = RequestPair::::decode(buf)?; + // EthMessage::GetBlockHeaders(request_pair) + // } + // EthMessageID::BlockHeaders => { + // let request_pair = RequestPair::::decode(buf)?; + // EthMessage::BlockHeaders(request_pair) + // } + // EthMessageID::GetBlockBodies => { + // let request_pair = RequestPair::::decode(buf)?; + // EthMessage::GetBlockBodies(request_pair) + // } + // EthMessageID::BlockBodies => { + // let request_pair = RequestPair::::decode(buf)?; + // EthMessage::BlockBodies(request_pair) + // } + // EthMessageID::GetPooledTransactions => { + // let request_pair = RequestPair::::decode(buf)?; + // EthMessage::GetPooledTransactions(request_pair) + // } + // EthMessageID::PooledTransactions => { + // let request_pair = RequestPair::::decode(buf)?; + // EthMessage::PooledTransactions(request_pair) + // } + // EthMessageID::GetNodeData => { + // let request_pair = RequestPair::::decode(buf)?; + // EthMessage::GetNodeData(request_pair) + // } + // EthMessageID::NodeData => { + // let request_pair = RequestPair::::decode(buf)?; + // EthMessage::NodeData(request_pair) + // } + // EthMessageID::GetReceipts => { + // let request_pair = RequestPair::::decode(buf)?; + // EthMessage::GetReceipts(request_pair) + // } + // EthMessageID::Receipts => { + // let request_pair = RequestPair::::decode(buf)?; + // EthMessage::Receipts(request_pair) + // } + }; + Ok(ProtocolMessage { message_type, message }) + } +} + +/// Encodes the protocol message into bytes. +/// The message type is encoded as a single byte and prepended to the message. +impl Encodable for ProtocolMessage { + fn length(&self) -> usize { + self.message_type.length() + self.message.length() + } + fn encode(&self, out: &mut dyn bytes::BufMut) { + self.message_type.encode(out); + self.message.encode(out); + } +} + +/// Decodes a protocol message from bytes, using the first byte to determine the message type. +/// This decodes `eth/66` request ids for each message type. +impl Decodable for ProtocolMessage { + fn decode(buf: &mut &[u8]) -> Result { + let message_type = EthMessageID::decode(buf)?; + Self::decode_message(message_type, buf) + } +} + +impl From for ProtocolMessage { + fn from(message: EthMessage) -> Self { + ProtocolMessage { message_type: message.message_id(), message } + } +} + +// TODO: determine whats up with this enum variant size warning + +/// Represents a message in the eth wire protocol, versions 66 and 67. +/// +/// The ethereum wire protocol is a set of messages that are broadcasted to the network in two +/// styles: +/// * A request message sent by a peer (such as [`GetPooledTransactions`]), and an associated +/// response message (such as [`PooledTransactions`]). +/// * A message that is broadcast to the network, without a corresponding request. +/// +/// The newer `eth/66` is an efficiency upgrade on top of `eth/65`, introducing a request id to +/// correlate request-response message pairs. This allows for request multiplexing. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum EthMessage { + // Status is required for the protocol handshake + Status(Status), + // // The following messages are broadcast to the network + NewBlockHashes(NewBlockHashes), + // NewBlock(Box), + // Transactions(Transactions), + // NewPooledTransactionHashes(NewPooledTransactionHashes), + + // // The following messages are request-response message pairs + // GetBlockHeaders(RequestPair), + // BlockHeaders(RequestPair), + // GetBlockBodies(RequestPair), + // BlockBodies(RequestPair), + // GetPooledTransactions(RequestPair), + // PooledTransactions(RequestPair), + // GetNodeData(RequestPair), + // NodeData(RequestPair), + // GetReceipts(RequestPair), + // Receipts(RequestPair), +} + +impl EthMessage { + /// Returns the message's ID. + pub fn message_id(&self) -> EthMessageID { + match self { + EthMessage::Status(_) => EthMessageID::Status, + EthMessage::NewBlockHashes(_) => EthMessageID::NewBlockHashes, + // EthMessage::NewBlock(_) => EthMessageID::NewBlock, + // EthMessage::Transactions(_) => EthMessageID::Transactions, + // EthMessage::NewPooledTransactionHashes(_) => + // EthMessageID::NewPooledTransactionHashes, EthMessage::GetBlockHeaders(_) + // => EthMessageID::GetBlockHeaders, EthMessage::BlockHeaders(_) => + // EthMessageID::BlockHeaders, EthMessage::GetBlockBodies(_) => + // EthMessageID::GetBlockBodies, EthMessage::BlockBodies(_) => + // EthMessageID::BlockBodies, EthMessage::GetPooledTransactions(_) => + // EthMessageID::GetPooledTransactions, EthMessage::PooledTransactions(_) => + // EthMessageID::PooledTransactions, EthMessage::GetNodeData(_) => + // EthMessageID::GetNodeData, EthMessage::NodeData(_) => + // EthMessageID::NodeData, EthMessage::GetReceipts(_) => + // EthMessageID::GetReceipts, EthMessage::Receipts(_) => + // EthMessageID::Receipts, + } + } +} + +impl Encodable for EthMessage { + fn length(&self) -> usize { + match self { + EthMessage::Status(status) => status.length(), + EthMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.length(), + // EthMessage::NewBlock(new_block) => new_block.length(), + // EthMessage::Transactions(transactions) => transactions.length(), + // EthMessage::NewPooledTransactionHashes(hashes) => hashes.length(), + // EthMessage::GetBlockHeaders(request) => request.length(), + // EthMessage::BlockHeaders(headers) => headers.length(), + // EthMessage::GetBlockBodies(request) => request.length(), + // EthMessage::BlockBodies(bodies) => bodies.length(), + // EthMessage::GetPooledTransactions(request) => request.length(), + // EthMessage::PooledTransactions(transactions) => transactions.length(), + // EthMessage::GetNodeData(request) => request.length(), + // EthMessage::NodeData(data) => data.length(), + // EthMessage::GetReceipts(request) => request.length(), + // EthMessage::Receipts(receipts) => receipts.length(), + } + } + fn encode(&self, out: &mut dyn bytes::BufMut) { + match self { + EthMessage::Status(status) => status.encode(out), + EthMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out), + // EthMessage::NewBlock(new_block) => new_block.encode(out), + // EthMessage::Transactions(transactions) => transactions.encode(out), + // EthMessage::NewPooledTransactionHashes(hashes) => hashes.encode(out), + // EthMessage::GetBlockHeaders(request) => request.encode(out), + // EthMessage::BlockHeaders(headers) => headers.encode(out), + // EthMessage::GetBlockBodies(request) => request.encode(out), + // EthMessage::BlockBodies(bodies) => bodies.encode(out), + // EthMessage::GetPooledTransactions(request) => request.encode(out), + // EthMessage::PooledTransactions(transactions) => transactions.encode(out), + // EthMessage::GetNodeData(request) => request.encode(out), + // EthMessage::NodeData(data) => data.encode(out), + // EthMessage::GetReceipts(request) => request.encode(out), + // EthMessage::Receipts(receipts) => receipts.encode(out), + } + } +} + +/// Represents message IDs for eth protocol messages. +#[repr(u8)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum EthMessageID { + Status = 0x00, + NewBlockHashes = 0x01, + Transactions = 0x02, + GetBlockHeaders = 0x03, + BlockHeaders = 0x04, + GetBlockBodies = 0x05, + BlockBodies = 0x06, + NewBlock = 0x07, + NewPooledTransactionHashes = 0x08, + GetPooledTransactions = 0x09, + PooledTransactions = 0x0a, + GetNodeData = 0x0d, + NodeData = 0x0e, + GetReceipts = 0x0f, + Receipts = 0x10, +} + +impl Encodable for EthMessageID { + fn length(&self) -> usize { + 1 + } + fn encode(&self, out: &mut dyn bytes::BufMut) { + out.put_u8(*self as u8); + } +} + +impl Decodable for EthMessageID { + fn decode(buf: &mut &[u8]) -> Result { + let id = buf.first().ok_or(reth_rlp::DecodeError::InputTooShort)?; + let id = match id { + 0x00 => EthMessageID::Status, + 0x01 => EthMessageID::NewBlockHashes, + 0x02 => EthMessageID::Transactions, + 0x03 => EthMessageID::GetBlockHeaders, + 0x04 => EthMessageID::BlockHeaders, + 0x05 => EthMessageID::GetBlockBodies, + 0x06 => EthMessageID::BlockBodies, + 0x07 => EthMessageID::NewBlock, + 0x08 => EthMessageID::NewPooledTransactionHashes, + 0x09 => EthMessageID::GetPooledTransactions, + 0x0a => EthMessageID::PooledTransactions, + 0x0d => EthMessageID::GetNodeData, + 0x0e => EthMessageID::NodeData, + 0x0f => EthMessageID::GetReceipts, + 0x10 => EthMessageID::Receipts, + _ => return Err(reth_rlp::DecodeError::Custom("Invalid message ID")), + }; + buf.advance(1); + Ok(id) + } +} + +impl TryFrom for EthMessageID { + type Error = &'static str; + + fn try_from(value: usize) -> Result { + match value { + 0x00 => Ok(EthMessageID::Status), + 0x01 => Ok(EthMessageID::NewBlockHashes), + 0x02 => Ok(EthMessageID::Transactions), + 0x03 => Ok(EthMessageID::GetBlockHeaders), + 0x04 => Ok(EthMessageID::BlockHeaders), + 0x05 => Ok(EthMessageID::GetBlockBodies), + 0x06 => Ok(EthMessageID::BlockBodies), + 0x07 => Ok(EthMessageID::NewBlock), + 0x08 => Ok(EthMessageID::NewPooledTransactionHashes), + 0x09 => Ok(EthMessageID::GetPooledTransactions), + 0x0a => Ok(EthMessageID::PooledTransactions), + 0x0d => Ok(EthMessageID::GetNodeData), + 0x0e => Ok(EthMessageID::NodeData), + 0x0f => Ok(EthMessageID::GetReceipts), + 0x10 => Ok(EthMessageID::Receipts), + _ => Err("Invalid message ID"), + } + } +} + +/// This is used for all request-response style `eth` protocol messages. +/// This can represent either a request or a response, since both include a message payload and +/// request id. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RequestPair { + /// id for the contained request or response message + pub request_id: u64, + + /// the request or response message payload + pub message: T, +} + +/// Allows messages with request ids to be serialized into RLP bytes. +impl Encodable for RequestPair +where + T: Encodable, +{ + fn length(&self) -> usize { + let mut length = 0; + length += self.request_id.length(); + length += self.message.length(); + length += length_of_length(length); + length + } + + fn encode(&self, out: &mut dyn reth_rlp::BufMut) { + let header = + Header { list: true, payload_length: self.request_id.length() + self.message.length() }; + + header.encode(out); + self.request_id.encode(out); + self.message.encode(out); + } +} + +/// Allows messages with request ids to be deserialized into RLP bytes. +impl Decodable for RequestPair +where + T: Decodable, +{ + fn decode(buf: &mut &[u8]) -> Result { + let _header = Header::decode(buf)?; + Ok(Self { request_id: u64::decode(buf)?, message: T::decode(buf)? }) + } +} + +#[cfg(test)] +mod test { + use crate::types::message::RequestPair; + use hex_literal::hex; + use reth_rlp::{Decodable, Encodable}; + + fn encode(value: T) -> Vec { + let mut buf = vec![]; + value.encode(&mut buf); + buf + } + + #[test] + fn request_pair_encode() { + let request_pair = RequestPair { request_id: 1337, message: vec![5u8] }; + + // c5: start of list (c0) + len(full_list) (length is <55 bytes) + // 82: 0x80 + len(1337) + // 05 39: 1337 (request_id) + // === full_list === + // c1: start of list (c0) + len(list) (length is <55 bytes) + // 05: 5 (message) + let expected = hex!("c5820539c105"); + let got = encode(request_pair); + assert_eq!(expected[..], got, "expected: {:X?}, got: {:X?}", expected, got,); + } + + #[test] + fn request_pair_decode() { + let raw_pair = &hex!("c5820539c105")[..]; + + let expected = RequestPair { request_id: 1337, message: vec![5u8] }; + + let got = RequestPair::>::decode(&mut &*raw_pair).unwrap(); + assert_eq!(expected.length(), raw_pair.len()); + assert_eq!(expected, got); + } +} diff --git a/crates/net/eth-wire/src/types/mod.rs b/crates/net/eth-wire/src/types/mod.rs new file mode 100644 index 000000000..4f42c5e67 --- /dev/null +++ b/crates/net/eth-wire/src/types/mod.rs @@ -0,0 +1,14 @@ +//! Types for the eth wire protocol. + +mod status; +pub use status::Status; + +mod version; +pub use version::EthVersion; + +pub mod forkid; + +pub mod message; +pub use message::{EthMessage, EthMessageID, ProtocolMessage}; + +pub mod broadcast; diff --git a/crates/net/eth-wire/src/status.rs b/crates/net/eth-wire/src/types/status.rs similarity index 99% rename from crates/net/eth-wire/src/status.rs rename to crates/net/eth-wire/src/types/status.rs index dc9cd48b4..6b9681618 100644 --- a/crates/net/eth-wire/src/status.rs +++ b/crates/net/eth-wire/src/types/status.rs @@ -1,4 +1,4 @@ -use crate::forkid::ForkId; +use super::forkid::ForkId; use reth_primitives::{Chain, H256, U256}; use reth_rlp::{RlpDecodable, RlpEncodable}; use std::fmt::{Debug, Display}; @@ -92,7 +92,7 @@ mod tests { use reth_rlp::{Decodable, Encodable}; use std::str::FromStr; - use crate::{ + use crate::types::{ forkid::{ForkHash, ForkId}, EthVersion, Status, }; diff --git a/crates/net/eth-wire/src/version.rs b/crates/net/eth-wire/src/types/version.rs similarity index 96% rename from crates/net/eth-wire/src/version.rs rename to crates/net/eth-wire/src/types/version.rs index 8bf2ba8b1..cd29c49d8 100644 --- a/crates/net/eth-wire/src/version.rs +++ b/crates/net/eth-wire/src/types/version.rs @@ -20,7 +20,7 @@ pub enum EthVersion { /// /// # Example /// ``` -/// use reth_eth_wire::EthVersion; +/// use reth_eth_wire::types::EthVersion; /// /// let version = EthVersion::try_from("67").unwrap(); /// assert_eq!(version, EthVersion::Eth67); @@ -42,7 +42,7 @@ impl TryFrom<&str> for EthVersion { /// /// # Example /// ``` -/// use reth_eth_wire::EthVersion; +/// use reth_eth_wire::types::EthVersion; /// /// let version = EthVersion::try_from(67).unwrap(); /// assert_eq!(version, EthVersion::Eth67);