feature: eth-wire types standalone crate (#7373)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
0xAtreides
2024-04-01 18:29:15 +01:00
committed by GitHub
parent 9e55ba6d13
commit 1ad13b95d8
21 changed files with 237 additions and 81 deletions

View File

@ -14,8 +14,7 @@ workspace = true
reth-primitives.workspace = true
reth-rpc-types.workspace = true
reth-network-api.workspace = true
# TODO(onbjerg): We only need this for [BlockBody]
reth-eth-wire.workspace = true
reth-eth-wire-types.workspace = true
# async
futures.workspace = true
@ -43,4 +42,4 @@ secp256k1 = { workspace = true, features = ["alloc", "recovery", "rand"] }
[features]
test-utils = ["secp256k1", "rand", "parking_lot"]
cli = ["clap"]
optimism = ["reth-eth-wire/optimism"]
optimism = ["reth-eth-wire-types/optimism"]

View File

@ -1,6 +1,6 @@
use crate::p2p::{download::DownloadClient, error::PeerRequestResult, priority::Priority};
use futures::{Future, FutureExt};
pub use reth_eth_wire::BlockHeaders;
pub use reth_eth_wire_types::BlockHeaders;
use reth_primitives::{BlockHashOrNumber, Header, HeadersDirection};
use std::{
fmt::Debug,

View File

@ -0,0 +1,51 @@
[package]
name = "reth-eth-wire-types"
description = "types for eth-wire"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
exclude.workspace = true
[lints]
workspace = true
[dependencies]
# reth
reth-codecs.workspace = true
reth-primitives.workspace = true
alloy-rlp = { workspace = true, features = ["derive"] }
alloy-chains = { workspace = true, features = ["serde", "rlp", "arbitrary"] }
bytes.workspace = true
derive_more.workspace = true
thiserror.workspace = true
serde = { workspace = true, optional = true }
# arbitrary utils
arbitrary = { workspace = true, features = ["derive"], optional = true }
proptest = { workspace = true, optional = true }
proptest-derive = { workspace = true, optional = true }
[dev-dependencies]
reth-net-common.workspace = true
reth-primitives = { workspace = true, features = ["arbitrary"] }
reth-tracing.workspace = true
test-fuzz.workspace = true
tokio-util = { workspace = true, features = ["io", "codec"] }
rand.workspace = true
secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery"] }
arbitrary = { workspace = true, features = ["derive"] }
proptest.workspace = true
proptest-derive.workspace = true
async-stream = "0.3"
[features]
default = ["serde"]
serde = ["dep:serde"]
arbitrary = ["reth-primitives/arbitrary", "dep:arbitrary", "dep:proptest", "dep:proptest-derive"]
optimism = ["reth-primitives/optimism"]

View File

@ -128,9 +128,7 @@ impl From<Vec<BlockBody>> for BlockBodies {
#[cfg(test)]
mod tests {
use crate::types::{
message::RequestPair, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
};
use crate::{message::RequestPair, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders};
use alloy_rlp::{Decodable, Encodable};
use reth_primitives::{
hex, BlockHashOrNumber, Header, HeadersDirection, Signature, Transaction, TransactionKind,

View File

@ -0,0 +1,33 @@
//! Types for the eth wire protocol: <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
mod status;
pub use status::{Status, StatusBuilder};
pub mod version;
pub use version::EthVersion;
pub mod message;
pub use message::{EthMessage, EthMessageID, ProtocolMessage};
pub mod blocks;
pub use blocks::*;
pub mod broadcast;
pub use broadcast::*;
pub mod transactions;
pub use transactions::*;
pub mod state;
pub use state::*;
pub mod receipts;
pub use receipts::*;

View File

@ -11,7 +11,8 @@ use super::{
GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, Transactions,
};
use crate::{errors::EthStreamError, EthVersion, SharedTransactions};
use crate::{EthVersion, SharedTransactions};
use alloy_rlp::{length_of_length, Decodable, Encodable, Header};
use reth_primitives::bytes::{Buf, BufMut};
#[cfg(feature = "serde")]
@ -22,6 +23,17 @@ use std::{fmt::Debug, sync::Arc};
// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50
pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
/// Error when sending/receiving a message
#[derive(thiserror::Error, Debug)]
pub enum MessageError {
/// Flags an unrecognized message ID for a given protocol version.
#[error("message id {1:?} is invalid for version {0:?}")]
Invalid(EthVersion, EthMessageID),
/// Thrown when rlp decoding a message message failed.
#[error("RLP error: {0}")]
RlpError(#[from] alloy_rlp::Error),
}
/// An `eth` protocol message, containing a message ID and payload.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
@ -34,7 +46,7 @@ pub struct ProtocolMessage {
impl ProtocolMessage {
/// Create a new ProtocolMessage from a message type and message rlp bytes.
pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result<Self, EthStreamError> {
pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result<Self, MessageError> {
let message_type = EthMessageID::decode(buf)?;
let message = match message_type {
@ -81,20 +93,14 @@ impl ProtocolMessage {
}
EthMessageID::GetNodeData => {
if version >= EthVersion::Eth67 {
return Err(EthStreamError::EthInvalidMessageError(
version,
EthMessageID::GetNodeData,
))
return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
}
let request_pair = RequestPair::<GetNodeData>::decode(buf)?;
EthMessage::GetNodeData(request_pair)
}
EthMessageID::NodeData => {
if version >= EthVersion::Eth67 {
return Err(EthStreamError::EthInvalidMessageError(
version,
EthMessageID::GetNodeData,
))
return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
}
let request_pair = RequestPair::<NodeData>::decode(buf)?;
EthMessage::NodeData(request_pair)
@ -487,9 +493,9 @@ where
#[cfg(test)]
mod tests {
use super::MessageError;
use crate::{
errors::EthStreamError, types::message::RequestPair, EthMessage, EthMessageID, GetNodeData,
NodeData, ProtocolMessage,
message::RequestPair, EthMessage, EthMessageID, GetNodeData, NodeData, ProtocolMessage,
};
use alloy_rlp::{Decodable, Encodable, Error};
use reth_primitives::hex;
@ -509,14 +515,14 @@ mod tests {
message: get_node_data,
});
let msg = ProtocolMessage::decode_message(crate::EthVersion::Eth67, &mut &buf[..]);
assert!(matches!(msg, Err(EthStreamError::EthInvalidMessageError(..))));
assert!(matches!(msg, Err(MessageError::Invalid(..))));
let node_data =
EthMessage::NodeData(RequestPair { request_id: 1337, message: NodeData(vec![]) });
let buf =
encode(ProtocolMessage { message_type: EthMessageID::NodeData, message: node_data });
let msg = ProtocolMessage::decode_message(crate::EthVersion::Eth67, &mut &buf[..]);
assert!(matches!(msg, Err(EthStreamError::EthInvalidMessageError(..))));
assert!(matches!(msg, Err(MessageError::Invalid(..))));
}
#[test]

View File

@ -34,10 +34,7 @@ pub struct Receipts(
#[cfg(test)]
mod tests {
use crate::{
types::{message::RequestPair, GetReceipts},
Receipts,
};
use crate::{message::RequestPair, GetReceipts, Receipts};
use alloy_rlp::{Decodable, Encodable};
use reth_primitives::{hex, Log, Receipt, ReceiptWithBloom, TxType};

View File

@ -1,4 +1,4 @@
use crate::{EthVersion, StatusBuilder};
use crate::EthVersion;
use alloy_chains::{Chain, NamedChain};
use alloy_rlp::{RlpDecodable, RlpEncodable};
use reth_codecs::derive_arbitrary;
@ -147,9 +147,86 @@ impl Default for Status {
}
}
/// Builder for [`Status`] messages.
///
/// # Example
/// ```
/// use reth_eth_wire_types::{EthVersion, Status};
/// use reth_primitives::{Chain, Hardfork, B256, MAINNET, MAINNET_GENESIS_HASH, U256};
///
/// // this is just an example status message!
/// let status = Status::builder()
/// .version(EthVersion::Eth66.into())
/// .chain(Chain::mainnet())
/// .total_difficulty(U256::from(100))
/// .blockhash(B256::from(MAINNET_GENESIS_HASH))
/// .genesis(B256::from(MAINNET_GENESIS_HASH))
/// .forkid(MAINNET.hardfork_fork_id(Hardfork::Paris).unwrap())
/// .build();
///
/// assert_eq!(
/// status,
/// Status {
/// version: EthVersion::Eth66.into(),
/// chain: Chain::mainnet(),
/// total_difficulty: U256::from(100),
/// blockhash: B256::from(MAINNET_GENESIS_HASH),
/// genesis: B256::from(MAINNET_GENESIS_HASH),
/// forkid: MAINNET.hardfork_fork_id(Hardfork::Paris).unwrap(),
/// }
/// );
/// ```
#[derive(Debug, Default)]
pub struct StatusBuilder {
status: Status,
}
impl StatusBuilder {
/// Consumes the type and creates the actual [`Status`] message.
pub fn build(self) -> Status {
self.status
}
/// Sets the protocol version.
pub fn version(mut self, version: u8) -> Self {
self.status.version = version;
self
}
/// Sets the chain id.
pub fn chain(mut self, chain: Chain) -> Self {
self.status.chain = chain;
self
}
/// Sets the total difficulty.
pub fn total_difficulty(mut self, total_difficulty: U256) -> Self {
self.status.total_difficulty = total_difficulty;
self
}
/// Sets the block hash.
pub fn blockhash(mut self, blockhash: B256) -> Self {
self.status.blockhash = blockhash;
self
}
/// Sets the genesis hash.
pub fn genesis(mut self, genesis: B256) -> Self {
self.status.genesis = genesis;
self
}
/// Sets the fork id.
pub fn forkid(mut self, forkid: ForkId) -> Self {
self.status.forkid = forkid;
self
}
}
#[cfg(test)]
mod tests {
use crate::types::{EthVersion, Status};
use crate::{EthVersion, Status};
use alloy_chains::{Chain, NamedChain};
use alloy_rlp::{Decodable, Encodable};
use rand::Rng;

View File

@ -1,5 +1,4 @@
//! Support for representing the version of the `eth`. [`Capability`](crate::capability::Capability)
//! and [Protocol](crate::protocol::Protocol).
//! Support for representing the version of the `eth`
use std::str::FromStr;
@ -59,7 +58,7 @@ impl EthVersion {
///
/// # Example
/// ```
/// use reth_eth_wire::types::EthVersion;
/// use reth_eth_wire_types::EthVersion;
///
/// let version = EthVersion::try_from("67").unwrap();
/// assert_eq!(version, EthVersion::Eth67);
@ -82,7 +81,7 @@ impl TryFrom<&str> for EthVersion {
///
/// # Example
/// ```
/// use reth_eth_wire::types::EthVersion;
/// use reth_eth_wire_types::EthVersion;
///
/// let version = EthVersion::try_from(67).unwrap();
/// assert_eq!(version, EthVersion::Eth67);

View File

@ -19,6 +19,7 @@ reth-ecies.workspace = true
alloy-rlp = { workspace = true, features = ["derive"] }
alloy-chains = { workspace = true, features = ["serde", "rlp", "arbitrary"] }
reth-discv4.workspace = true
reth-eth-wire-types.workspace = true
# metrics
reth-metrics.workspace = true

View File

@ -2,8 +2,9 @@
use crate::{
errors::{MuxDemuxError, P2PStreamError},
message::MessageError,
version::ParseVersionError,
DisconnectReason, EthMessageID, EthVersion,
DisconnectReason,
};
use alloy_chains::Chain;
use reth_primitives::{GotExpected, GotExpectedBoxed, ValidationError, B256};
@ -24,9 +25,9 @@ pub enum EthStreamError {
#[error(transparent)]
/// Failed Ethereum handshake.
EthHandshakeError(#[from] EthHandshakeError),
#[error("message id {1:?} is invalid for version {0:?}")]
/// Flags an unrecognized message ID for a given protocol version.
EthInvalidMessageError(EthVersion, EthMessageID),
/// Thrown when decoding a message message failed.
#[error(transparent)]
InvalidMessage(#[from] MessageError),
#[error("message size ({0}) exceeds max length (10MB)")]
/// Received a message whose size exceeds the standard limit.
MessageTooBig(usize),
@ -40,8 +41,8 @@ pub enum EthStreamError {
/// The number of transaction sizes.
sizes_len: usize,
},
/// Error when data is not recieved from peer for a prolonged period.
#[error("never recieved data from remote peer")]
/// Error when data is not received from peer for a prolonged period.
#[error("never received data from remote peer")]
StreamTimeout,
}
@ -74,12 +75,6 @@ impl From<io::Error> for EthStreamError {
}
}
impl From<alloy_rlp::Error> for EthStreamError {
fn from(err: alloy_rlp::Error) -> Self {
P2PStreamError::from(err).into()
}
}
/// Error that can occur during the `eth` sub-protocol handshake.
#[derive(thiserror::Error, Debug)]
pub enum EthHandshakeError {

View File

@ -2,8 +2,7 @@ use crate::{
errors::{EthHandshakeError, EthStreamError},
message::{EthBroadcastMessage, ProtocolBroadcastMessage},
p2pstream::HANDSHAKE_TIMEOUT,
types::{EthMessage, ProtocolMessage, Status},
CanDisconnect, DisconnectReason, EthVersion,
CanDisconnect, DisconnectReason, EthMessage, EthVersion, ProtocolMessage, Status,
};
use futures::{ready, Sink, SinkExt, StreamExt};
use pin_project::pin_project;
@ -111,7 +110,7 @@ where
Err(err) => {
debug!("decode error in eth handshake: msg={their_msg:x}");
self.inner.disconnect(DisconnectReason::DisconnectRequested).await?;
return Err(err)
return Err(EthStreamError::InvalidMessage(err))
}
};
@ -278,7 +277,7 @@ where
%msg,
"failed to decode protocol message"
);
return Poll::Ready(Some(Err(err)))
return Poll::Ready(Some(Err(EthStreamError::InvalidMessage(err))))
}
};
@ -347,10 +346,10 @@ where
mod tests {
use super::UnauthedEthStream;
use crate::{
broadcast::BlockHashNumber,
errors::{EthHandshakeError, EthStreamError},
p2pstream::{ProtocolVersion, UnauthedP2PStream},
types::{broadcast::BlockHashNumber, EthMessage, EthVersion, Status},
EthStream, HelloMessageWithProtocols, PassthroughCodec,
EthMessage, EthStream, EthVersion, HelloMessageWithProtocols, PassthroughCodec, Status,
};
use alloy_chains::NamedChain;
use futures::{SinkExt, StreamExt};

View File

@ -15,7 +15,6 @@
#![allow(unknown_lints, non_local_definitions)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
pub mod builder;
pub mod capability;
mod disconnect;
pub mod errors;
@ -26,9 +25,6 @@ pub mod muxdemux;
mod p2pstream;
mod pinger;
pub mod protocol;
pub use builder::*;
pub mod types;
pub use types::*;
#[cfg(test)]
pub mod test_utils;
@ -48,5 +44,8 @@ pub use crate::{
DisconnectP2P, P2PMessage, P2PMessageID, P2PStream, ProtocolVersion, UnauthedP2PStream,
MAX_RESERVED_MESSAGE_ID,
},
types::EthVersion,
};
// Re-export wire types
#[doc(inline)]
pub use reth_eth_wire_types::*;

View File

@ -226,7 +226,7 @@ where
/// This stream emits _non-empty_ Bytes that start with the normalized message id, so that the first
/// byte of each message starts from 0. If this stream only supports a single capability, for
/// example `eth` then the first byte of each message will match
/// [EthMessageID](crate::types::EthMessageID).
/// [EthMessageID](reth_eth_wire_types::message::EthMessageID).
#[pin_project]
#[derive(Debug)]
pub struct P2PStream<S> {

View File

@ -1,25 +0,0 @@
//! Types for the eth wire protocol.
mod status;
pub use status::Status;
pub mod version;
pub use version::EthVersion;
pub mod message;
pub use message::{EthMessage, EthMessageID, ProtocolMessage};
pub mod blocks;
pub use blocks::*;
pub mod broadcast;
pub use broadcast::*;
pub mod transactions;
pub use transactions::*;
pub mod state;
pub use state::*;
pub mod receipts;
pub use receipts::*;