diff --git a/crates/net/eth-wire/src/capability.rs b/crates/net/eth-wire/src/capability.rs index cd2394aea..da8f7e4d1 100644 --- a/crates/net/eth-wire/src/capability.rs +++ b/crates/net/eth-wire/src/capability.rs @@ -64,6 +64,12 @@ impl Capability { pub fn is_eth_v67(&self) -> bool { self.name == "eth" && self.version == 67 } + + /// Whether this is eth v68. + #[inline] + pub fn is_eth_v68(&self) -> bool { + self.name == "eth" && self.version == 68 + } } #[cfg(any(test, feature = "arbitrary"))] @@ -97,6 +103,7 @@ pub struct Capabilities { inner: Vec, eth_66: bool, eth_67: bool, + eth_68: bool, } impl Capabilities { @@ -115,7 +122,7 @@ impl Capabilities { /// Whether the peer supports `eth` sub-protocol. #[inline] pub fn supports_eth(&self) -> bool { - self.eth_67 || self.eth_66 + self.eth_68 || self.eth_67 || self.eth_66 } /// Whether this peer supports eth v66 protocol. @@ -129,6 +136,12 @@ impl Capabilities { pub fn supports_eth_v67(&self) -> bool { self.eth_67 } + + /// Whether this peer supports eth v68 protocol. + #[inline] + pub fn supports_eth_v68(&self) -> bool { + self.eth_68 + } } impl From> for Capabilities { @@ -136,6 +149,7 @@ impl From> for Capabilities { Self { eth_66: value.iter().any(Capability::is_eth_v66), eth_67: value.iter().any(Capability::is_eth_v67), + eth_68: value.iter().any(Capability::is_eth_v68), inner: value, } } @@ -154,6 +168,7 @@ impl Decodable for Capabilities { Ok(Self { eth_66: inner.iter().any(Capability::is_eth_v66), eth_67: inner.iter().any(Capability::is_eth_v67), + eth_68: inner.iter().any(Capability::is_eth_v68), inner, }) } @@ -227,6 +242,15 @@ pub enum SharedCapabilityError { mod tests { use super::*; + #[test] + fn from_eth_68() { + let capability = SharedCapability::new("eth", 68, 0).unwrap(); + + assert_eq!(capability.name(), "eth"); + assert_eq!(capability.version(), 68); + assert_eq!(capability, SharedCapability::Eth { version: EthVersion::Eth68, offset: 0 }); + } + #[test] fn from_eth_67() { let capability = SharedCapability::new("eth", 67, 0).unwrap(); @@ -244,4 +268,19 @@ mod tests { assert_eq!(capability.version(), 66); assert_eq!(capability, SharedCapability::Eth { version: EthVersion::Eth66, offset: 0 }); } + + #[test] + fn capabilities_supports_eth() { + let capabilities: Capabilities = vec![ + Capability::new("eth".into(), 66), + Capability::new("eth".into(), 67), + Capability::new("eth".into(), 68), + ] + .into(); + + assert!(capabilities.supports_eth()); + assert!(capabilities.supports_eth_v66()); + assert!(capabilities.supports_eth_v67()); + assert!(capabilities.supports_eth_v68()); + } } diff --git a/crates/net/eth-wire/src/errors/eth.rs b/crates/net/eth-wire/src/errors/eth.rs index 6090fa40d..e1c4aa969 100644 --- a/crates/net/eth-wire/src/errors/eth.rs +++ b/crates/net/eth-wire/src/errors/eth.rs @@ -1,5 +1,7 @@ //! Error handling for (`EthStream`)[crate::EthStream] -use crate::{errors::P2PStreamError, DisconnectReason}; +use crate::{ + errors::P2PStreamError, version::ParseVersionError, DisconnectReason, EthMessageID, EthVersion, +}; use reth_primitives::{Chain, ValidationError, H256}; use std::io; @@ -10,9 +12,15 @@ pub enum EthStreamError { #[error(transparent)] P2PStreamError(#[from] P2PStreamError), #[error(transparent)] + ParseVersionError(#[from] ParseVersionError), + #[error(transparent)] EthHandshakeError(#[from] EthHandshakeError), + #[error("For {0:?} version, message id({1:?}) is invalid")] + EthInvalidMessageError(EthVersion, EthMessageID), #[error("message size ({0}) exceeds max length (10MB)")] MessageTooBig(usize), + #[error("TransactionHashes invalid len of fields: hashes_len={hashes_len} types_len={types_len} sizes_len={sizes_len}")] + TransactionHashesInvalidLenOfFields { hashes_len: usize, types_len: usize, sizes_len: usize }, } // === impl EthStreamError === diff --git a/crates/net/eth-wire/src/ethstream.rs b/crates/net/eth-wire/src/ethstream.rs index c0f96ecc8..c916a735e 100644 --- a/crates/net/eth-wire/src/ethstream.rs +++ b/crates/net/eth-wire/src/ethstream.rs @@ -2,6 +2,7 @@ use crate::{ errors::{EthHandshakeError, EthStreamError}, message::{EthBroadcastMessage, ProtocolBroadcastMessage}, types::{EthMessage, ProtocolMessage, Status}, + EthVersion, }; use futures::{ready, Sink, SinkExt, StreamExt}; use pin_project::pin_project; @@ -9,7 +10,7 @@ use reth_primitives::{ bytes::{Bytes, BytesMut}, ForkFilter, }; -use reth_rlp::{Decodable, Encodable}; +use reth_rlp::Encodable; use std::{ pin::Pin, task::{Context, Poll}, @@ -76,11 +77,12 @@ where return Err(EthStreamError::MessageTooBig(their_msg.len())) } - let msg = match ProtocolMessage::decode(&mut their_msg.as_ref()) { + let version = EthVersion::try_from(status.version)?; + let msg = match ProtocolMessage::decode_message(version, &mut their_msg.as_ref()) { Ok(m) => m, Err(err) => { - tracing::debug!("rlp decode error in eth handshake: msg={their_msg:x}"); - return Err(err.into()) + tracing::debug!("decode error in eth handshake: msg={their_msg:x}"); + return Err(err) } }; @@ -120,7 +122,7 @@ where // now we can create the `EthStream` because the peer has successfully completed // the handshake - let stream = EthStream::new(self.inner); + let stream = EthStream::new(version, self.inner); Ok((stream, resp)) } @@ -136,6 +138,7 @@ where #[pin_project] #[derive(Debug)] pub struct EthStream { + version: EthVersion, #[pin] inner: S, } @@ -143,8 +146,13 @@ pub struct EthStream { impl EthStream { /// Creates a new unauthed [`EthStream`] from a provided stream. You will need /// to manually handshake a peer. - pub fn new(inner: S) -> Self { - Self { inner } + pub fn new(version: EthVersion, inner: S) -> Self { + Self { version, inner } + } + + /// Returns the eth version. + pub fn version(&self) -> EthVersion { + self.version } /// Returns the underlying stream. @@ -203,11 +211,11 @@ where return Poll::Ready(Some(Err(EthStreamError::MessageTooBig(bytes.len())))) } - let msg = match ProtocolMessage::decode(&mut bytes.as_ref()) { + let msg = match ProtocolMessage::decode_message(*this.version, &mut bytes.as_ref()) { Ok(m) => m, Err(err) => { - tracing::debug!("rlp decode error: msg={bytes:x}"); - return Poll::Ready(Some(Err(err.into()))) + tracing::debug!("decode error: msg={bytes:x}"); + return Poll::Ready(Some(Err(err))) } }; @@ -337,7 +345,7 @@ mod tests { // roughly based off of the design of tokio::net::TcpListener let (incoming, _) = listener.accept().await.unwrap(); let stream = PassthroughCodec::default().framed(incoming); - let mut stream = EthStream::new(stream); + let mut stream = EthStream::new(EthVersion::Eth67, stream); // use the stream to get the next message let message = stream.next().await.unwrap().unwrap(); @@ -346,7 +354,7 @@ mod tests { let outgoing = TcpStream::connect(local_addr).await.unwrap(); let sink = PassthroughCodec::default().framed(outgoing); - let mut client_stream = EthStream::new(sink); + let mut client_stream = EthStream::new(EthVersion::Eth67, sink); client_stream.send(test_msg).await.unwrap(); @@ -372,7 +380,7 @@ mod tests { // roughly based off of the design of tokio::net::TcpListener let (incoming, _) = listener.accept().await.unwrap(); let stream = ECIESStream::incoming(incoming, server_key).await.unwrap(); - let mut stream = EthStream::new(stream); + let mut stream = EthStream::new(EthVersion::Eth67, stream); // use the stream to get the next message let message = stream.next().await.unwrap().unwrap(); @@ -386,7 +394,7 @@ mod tests { let outgoing = TcpStream::connect(local_addr).await.unwrap(); let outgoing = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap(); - let mut client_stream = EthStream::new(outgoing); + let mut client_stream = EthStream::new(EthVersion::Eth67, outgoing); client_stream.send(test_msg).await.unwrap(); diff --git a/crates/net/eth-wire/src/hello.rs b/crates/net/eth-wire/src/hello.rs index 3024f7b65..ee9e99e2d 100644 --- a/crates/net/eth-wire/src/hello.rs +++ b/crates/net/eth-wire/src/hello.rs @@ -100,7 +100,7 @@ impl HelloMessageBuilder { protocol_version: protocol_version.unwrap_or_default(), client_version: client_version.unwrap_or_else(|| DEFAULT_CLIENT_VERSION.to_string()), capabilities: capabilities - .unwrap_or_else(|| vec![EthVersion::Eth66.into(), EthVersion::Eth67.into()]), + .unwrap_or_else(|| vec![EthVersion::Eth67.into(), EthVersion::Eth66.into()]), port: port.unwrap_or(30303), id, } diff --git a/crates/net/eth-wire/src/p2pstream.rs b/crates/net/eth-wire/src/p2pstream.rs index e7e92389a..6bbf31e23 100644 --- a/crates/net/eth-wire/src/p2pstream.rs +++ b/crates/net/eth-wire/src/p2pstream.rs @@ -948,7 +948,7 @@ mod tests { #[test] fn test_peer_lower_capability_version() { let local_capabilities: Vec = - vec![EthVersion::Eth66.into(), EthVersion::Eth67.into()]; + vec![EthVersion::Eth66.into(), EthVersion::Eth67.into(), EthVersion::Eth68.into()]; let peer_capabilities: Vec = vec![EthVersion::Eth66.into()]; let shared_capability = diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs index 27a74cce4..581387389 100644 --- a/crates/net/eth-wire/src/types/broadcast.rs +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -104,19 +104,33 @@ pub struct SharedTransactions( #[derive_arbitrary(rlp)] #[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct NewPooledTransactionHashes( +pub struct NewPooledTransactionHashes66( /// Transaction hashes for new transactions that have appeared on the network. /// Clients should request the transactions with the given hashes using a /// [`GetPooledTransactions`](crate::GetPooledTransactions) message. pub Vec, ); -impl From> for NewPooledTransactionHashes { +impl From> for NewPooledTransactionHashes66 { fn from(v: Vec) -> Self { - NewPooledTransactionHashes(v) + NewPooledTransactionHashes66(v) } } +/// Same as [`NewPooledTransactionHashes66`] but extends that that beside the transaction hashes, +/// the node sends the transaction types and their sizes (as defined in EIP-2718) as well. +#[derive_arbitrary(rlp)] +#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct NewPooledTransactionHashes68 { + /// Transaction types for new transactions that have appeared on the network. + pub types: Vec, + /// Transaction sizes for new transactions that have appeared on the network. + pub sizes: Vec, + /// Transaction hashes for new transactions that have appeared on the network. + pub hashes: Vec, +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/net/eth-wire/src/types/message.rs b/crates/net/eth-wire/src/types/message.rs index e52392d7b..82bc974ca 100644 --- a/crates/net/eth-wire/src/types/message.rs +++ b/crates/net/eth-wire/src/types/message.rs @@ -1,10 +1,10 @@ #![allow(missing_docs)] use super::{ broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, - GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewPooledTransactionHashes, - NodeData, PooledTransactions, Receipts, Status, Transactions, + GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewPooledTransactionHashes66, + NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, Transactions, }; -use crate::SharedTransactions; +use crate::{errors::EthStreamError, EthVersion, SharedTransactions}; use reth_primitives::bytes::{Buf, BufMut}; use reth_rlp::{length_of_length, Decodable, Encodable, Header}; use std::{fmt::Debug, sync::Arc}; @@ -22,10 +22,9 @@ pub struct ProtocolMessage { impl ProtocolMessage { /// Create a new ProtocolMessage from a message type and message rlp bytes. - pub fn decode_message( - message_type: EthMessageID, - buf: &mut &[u8], - ) -> Result { + pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result { + let message_type = EthMessageID::decode(buf)?; + let message = match message_type { EthMessageID::Status => EthMessage::Status(Status::decode(buf)?), EthMessageID::NewBlockHashes => { @@ -34,7 +33,15 @@ impl ProtocolMessage { EthMessageID::NewBlock => EthMessage::NewBlock(Box::new(NewBlock::decode(buf)?)), EthMessageID::Transactions => EthMessage::Transactions(Transactions::decode(buf)?), EthMessageID::NewPooledTransactionHashes => { - EthMessage::NewPooledTransactionHashes(NewPooledTransactionHashes::decode(buf)?) + if version >= EthVersion::Eth68 { + EthMessage::NewPooledTransactionHashes68(NewPooledTransactionHashes68::decode( + buf, + )?) + } else { + EthMessage::NewPooledTransactionHashes66(NewPooledTransactionHashes66::decode( + buf, + )?) + } } EthMessageID::GetBlockHeaders => { let request_pair = RequestPair::::decode(buf)?; @@ -61,10 +68,22 @@ impl ProtocolMessage { EthMessage::PooledTransactions(request_pair) } EthMessageID::GetNodeData => { + if version >= EthVersion::Eth67 { + return Err(EthStreamError::EthInvalidMessageError( + version, + EthMessageID::GetNodeData, + )) + } let request_pair = RequestPair::::decode(buf)?; EthMessage::GetNodeData(request_pair) } EthMessageID::NodeData => { + if version >= EthVersion::Eth67 { + return Err(EthStreamError::EthInvalidMessageError( + version, + EthMessageID::GetNodeData, + )) + } let request_pair = RequestPair::::decode(buf)?; EthMessage::NodeData(request_pair) } @@ -93,15 +112,6 @@ impl Encodable for ProtocolMessage { } } -/// Decodes a protocol message from bytes, using the first byte to determine the message type. -/// This decodes `eth/66` request ids for each message type. -impl Decodable for ProtocolMessage { - fn decode(buf: &mut &[u8]) -> Result { - let message_type = EthMessageID::decode(buf)?; - Self::decode_message(message_type, buf) - } -} - impl From for ProtocolMessage { fn from(message: EthMessage) -> Self { ProtocolMessage { message_type: message.message_id(), message } @@ -133,7 +143,7 @@ impl From for ProtocolBroadcastMessage { } } -/// Represents a message in the eth wire protocol, versions 66 and 67. +/// Represents a message in the eth wire protocol, versions 66, 67 and 68. /// /// The ethereum wire protocol is a set of messages that are broadcasted to the network in two /// styles: @@ -141,8 +151,15 @@ impl From for ProtocolBroadcastMessage { /// response message (such as [`PooledTransactions`]). /// * A message that is broadcast to the network, without a corresponding request. /// -/// The newer `eth/66` is an efficiency upgrade on top of `eth/65`, introducing a request id to -/// correlate request-response message pairs. This allows for request multiplexing. +/// The newer `eth/66` is an efficiency upgrade on top of `eth/65`, introducing a request id to +/// correlate request-response message pairs. This allows for request multiplexing. +/// +/// The `eth/67` is based on `eth/66` but only removes two messages, [`GetNodeData`] and +/// [``NodeData]. +/// +/// The `eth/68` changes only NewPooledTransactionHashes to include `types` and `sized`. For +/// it, NewPooledTransactionHashes is renamed as [`NewPooledTransactionHashes66`] and +/// [`NewPooledTransactionHashes68`] is defined. #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum EthMessage { @@ -152,7 +169,8 @@ pub enum EthMessage { NewBlockHashes(NewBlockHashes), NewBlock(Box), Transactions(Transactions), - NewPooledTransactionHashes(NewPooledTransactionHashes), + NewPooledTransactionHashes66(NewPooledTransactionHashes66), + NewPooledTransactionHashes68(NewPooledTransactionHashes68), // The following messages are request-response message pairs GetBlockHeaders(RequestPair), @@ -175,7 +193,8 @@ impl EthMessage { EthMessage::NewBlockHashes(_) => EthMessageID::NewBlockHashes, EthMessage::NewBlock(_) => EthMessageID::NewBlock, EthMessage::Transactions(_) => EthMessageID::Transactions, - EthMessage::NewPooledTransactionHashes(_) => EthMessageID::NewPooledTransactionHashes, + EthMessage::NewPooledTransactionHashes66(_) | + EthMessage::NewPooledTransactionHashes68(_) => EthMessageID::NewPooledTransactionHashes, EthMessage::GetBlockHeaders(_) => EthMessageID::GetBlockHeaders, EthMessage::BlockHeaders(_) => EthMessageID::BlockHeaders, EthMessage::GetBlockBodies(_) => EthMessageID::GetBlockBodies, @@ -197,7 +216,8 @@ impl Encodable for EthMessage { EthMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out), EthMessage::NewBlock(new_block) => new_block.encode(out), EthMessage::Transactions(transactions) => transactions.encode(out), - EthMessage::NewPooledTransactionHashes(hashes) => hashes.encode(out), + EthMessage::NewPooledTransactionHashes66(hashes) => hashes.encode(out), + EthMessage::NewPooledTransactionHashes68(hashes) => hashes.encode(out), EthMessage::GetBlockHeaders(request) => request.encode(out), EthMessage::BlockHeaders(headers) => headers.encode(out), EthMessage::GetBlockBodies(request) => request.encode(out), @@ -216,7 +236,8 @@ impl Encodable for EthMessage { EthMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.length(), EthMessage::NewBlock(new_block) => new_block.length(), EthMessage::Transactions(transactions) => transactions.length(), - EthMessage::NewPooledTransactionHashes(hashes) => hashes.length(), + EthMessage::NewPooledTransactionHashes66(hashes) => hashes.length(), + EthMessage::NewPooledTransactionHashes68(hashes) => hashes.length(), EthMessage::GetBlockHeaders(request) => request.length(), EthMessage::BlockHeaders(headers) => headers.length(), EthMessage::GetBlockBodies(request) => request.length(), @@ -403,7 +424,10 @@ where #[cfg(test)] mod test { - use crate::types::message::RequestPair; + use crate::{ + errors::EthStreamError, types::message::RequestPair, EthMessage, EthMessageID, GetNodeData, + NodeData, ProtocolMessage, + }; use hex_literal::hex; use reth_rlp::{Decodable, Encodable}; @@ -413,6 +437,25 @@ mod test { buf } + #[test] + fn test_removed_message_at_eth67() { + let get_node_data = + EthMessage::GetNodeData(RequestPair { request_id: 1337, message: GetNodeData(vec![]) }); + let buf = encode(ProtocolMessage { + message_type: EthMessageID::GetNodeData, + message: get_node_data, + }); + let msg = ProtocolMessage::decode_message(crate::EthVersion::Eth67, &mut &buf[..]); + assert!(matches!(msg, Err(EthStreamError::EthInvalidMessageError(..)))); + + let node_data = + EthMessage::NodeData(RequestPair { request_id: 1337, message: NodeData(vec![]) }); + let buf = + encode(ProtocolMessage { message_type: EthMessageID::NodeData, message: node_data }); + let msg = ProtocolMessage::decode_message(crate::EthVersion::Eth67, &mut &buf[..]); + assert!(matches!(msg, Err(EthStreamError::EthInvalidMessageError(..)))); + } + #[test] fn request_pair_encode() { let request_pair = RequestPair { request_id: 1337, message: vec![5u8] }; diff --git a/crates/net/eth-wire/src/types/version.rs b/crates/net/eth-wire/src/types/version.rs index 8db1759a1..2f84b969e 100644 --- a/crates/net/eth-wire/src/types/version.rs +++ b/crates/net/eth-wire/src/types/version.rs @@ -11,13 +11,16 @@ pub struct ParseVersionError(String); /// The `eth` protocol version. #[repr(u8)] -#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] pub enum EthVersion { /// The `eth` protocol version 66. Eth66 = 66, /// The `eth` protocol version 67. Eth67 = 67, + + /// The `eth` protocol version 68. + Eth68 = 68, } impl EthVersion { @@ -25,8 +28,8 @@ impl EthVersion { pub fn total_messages(&self) -> u8 { match self { EthVersion::Eth66 => 15, - EthVersion::Eth67 => { - // eth/67 is eth/66 minus GetNodeData and NodeData messages + EthVersion::Eth67 | EthVersion::Eth68 => { + // eth/67,68 are eth/66 minus GetNodeData and NodeData messages 13 } } @@ -50,6 +53,7 @@ impl TryFrom<&str> for EthVersion { match s { "66" => Ok(EthVersion::Eth66), "67" => Ok(EthVersion::Eth67), + "68" => Ok(EthVersion::Eth68), _ => Err(ParseVersionError(s.to_string())), } } @@ -72,6 +76,7 @@ impl TryFrom for EthVersion { match u { 66 => Ok(EthVersion::Eth66), 67 => Ok(EthVersion::Eth67), + 68 => Ok(EthVersion::Eth68), _ => Err(ParseVersionError(u.to_string())), } } @@ -99,6 +104,7 @@ impl From for &'static str { match v { EthVersion::Eth66 => "66", EthVersion::Eth67 => "67", + EthVersion::Eth68 => "68", } } } @@ -119,13 +125,15 @@ mod test { fn test_eth_version_try_from_str() { assert_eq!(EthVersion::Eth66, EthVersion::try_from("66").unwrap()); assert_eq!(EthVersion::Eth67, EthVersion::try_from("67").unwrap()); - assert_eq!(Err(ParseVersionError("68".to_string())), EthVersion::try_from("68")); + assert_eq!(EthVersion::Eth68, EthVersion::try_from("68").unwrap()); + assert_eq!(Err(ParseVersionError("69".to_string())), EthVersion::try_from("69")); } #[test] fn test_eth_version_from_str() { assert_eq!(EthVersion::Eth66, "66".parse().unwrap()); assert_eq!(EthVersion::Eth67, "67".parse().unwrap()); - assert_eq!(Err(ParseVersionError("68".to_string())), "68".parse::()); + assert_eq!(EthVersion::Eth68, "68".parse().unwrap()); + assert_eq!(Err(ParseVersionError("69".to_string())), "69".parse::()); } } diff --git a/crates/net/eth-wire/tests/fuzz_roundtrip.rs b/crates/net/eth-wire/tests/fuzz_roundtrip.rs index e57c329c2..7dad65196 100644 --- a/crates/net/eth-wire/tests/fuzz_roundtrip.rs +++ b/crates/net/eth-wire/tests/fuzz_roundtrip.rs @@ -51,8 +51,8 @@ pub mod fuzz_rlp { use reth_eth_wire::{ BlockBodies, BlockHeaders, DisconnectReason, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, HelloMessage, NewBlock, NewBlockHashes, - NewPooledTransactionHashes, NodeData, P2PMessage, PooledTransactions, Receipts, Status, - Transactions, + NewPooledTransactionHashes66, NewPooledTransactionHashes68, NodeData, P2PMessage, + PooledTransactions, Receipts, Status, Transactions, }; use reth_primitives::{BlockHashOrNumber, TransactionSigned}; use reth_rlp::{RlpDecodableWrapper, RlpEncodableWrapper}; @@ -137,7 +137,8 @@ pub mod fuzz_rlp { fuzz_type_and_name!(GetBlockBodies, fuzz_GetBlockBodies); fuzz_type_and_name!(BlockBodies, fuzz_BlockBodies); fuzz_type_and_name!(NewBlock, fuzz_NewBlock); - fuzz_type_and_name!(NewPooledTransactionHashes, fuzz_NewPooledTransactionHashes); + fuzz_type_and_name!(NewPooledTransactionHashes66, fuzz_NewPooledTransactionHashes66); + fuzz_type_and_name!(NewPooledTransactionHashes68, fuzz_NewPooledTransactionHashes68); fuzz_type_and_name!(GetPooledTransactions, fuzz_GetPooledTransactions); fuzz_type_and_name!(PooledTransactions, fuzz_PooledTransactions); fuzz_type_and_name!(GetNodeData, fuzz_GetNodeData); diff --git a/crates/net/eth-wire/tests/new_pooled_transactions.rs b/crates/net/eth-wire/tests/new_pooled_transactions.rs index 8272c5c73..9f767c257 100644 --- a/crates/net/eth-wire/tests/new_pooled_transactions.rs +++ b/crates/net/eth-wire/tests/new_pooled_transactions.rs @@ -1,5 +1,5 @@ //! Decoding tests for [`NewPooledTransactions`] -use reth_eth_wire::NewPooledTransactionHashes; +use reth_eth_wire::NewPooledTransactionHashes66; use reth_primitives::hex; use reth_rlp::Decodable; use std::{fs, path::PathBuf}; @@ -10,5 +10,5 @@ fn decode_new_pooled_transaction_hashes_network() { .join("testdata/new_pooled_transactions_network_rlp"); let data = fs::read_to_string(network_data_path).expect("Unable to read file"); let hex_data = hex::decode(data.trim()).unwrap(); - let _txs = NewPooledTransactionHashes::decode(&mut &hex_data[..]).unwrap(); + let _txs = NewPooledTransactionHashes66::decode(&mut &hex_data[..]).unwrap(); } diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index f2027c074..66f02863f 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -7,7 +7,7 @@ use futures::FutureExt; use reth_eth_wire::{ capability::RawCapabilityMessage, message::RequestPair, BlockBodies, BlockBody, BlockHeaders, EthMessage, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, - NewBlock, NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, + NewBlock, NewBlockHashes, NewPooledTransactionHashes66, NodeData, PooledTransactions, Receipts, SharedTransactions, Transactions, }; use reth_interfaces::p2p::error::{RequestError, RequestResult}; @@ -50,7 +50,7 @@ pub enum PeerMessage { /// Broadcast transactions _from_ local _to_ a peer. SendTransactions(SharedTransactions), /// Send new pooled transactions - PooledTransactions(NewPooledTransactionHashes), + PooledTransactions(NewPooledTransactionHashes66), /// All `eth` request variants. EthRequest(PeerRequest), /// Other than eth namespace message diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 52a21e13e..02024d9ae 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -4,7 +4,7 @@ use crate::{ }; use async_trait::async_trait; use parking_lot::Mutex; -use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions}; +use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes66, SharedTransactions}; use reth_interfaces::{ p2p::headers::client::StatusUpdater, sync::{SyncState, SyncStateProvider, SyncStateUpdater}, @@ -144,7 +144,7 @@ impl NetworkHandle { pub fn send_transactions_hashes(&self, peer_id: PeerId, msg: Vec) { self.send_message(NetworkHandleMessage::SendPooledTransactionHashes { peer_id, - msg: NewPooledTransactionHashes(msg), + msg: NewPooledTransactionHashes66(msg), }) } @@ -292,7 +292,7 @@ pub(crate) enum NetworkHandleMessage { /// Sends the list of transactions to the given peer. SendTransaction { peer_id: PeerId, msg: SharedTransactions }, /// Sends the list of transactions hashes to the given peer. - SendPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes }, + SendPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes66 }, /// Send an `eth` protocol request to the peer. EthRequest { /// The peer to send the request to. diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index c910ad9f6..8d58d7750 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -16,7 +16,7 @@ use reth_eth_wire::{ capability::Capabilities, errors::{EthHandshakeError, EthStreamError, P2PStreamError}, message::{EthBroadcastMessage, RequestPair}, - DisconnectReason, EthMessage, EthStream, P2PStream, + DisconnectReason, EthMessage, EthStream, EthVersion, P2PStream, }; use reth_interfaces::p2p::error::RequestError; use reth_metrics_common::metered_sender::MeteredSender; @@ -178,9 +178,23 @@ impl ActiveSession { EthMessage::Transactions(msg) => { self.try_emit_broadcast(PeerMessage::ReceivedTransaction(msg)).into() } - EthMessage::NewPooledTransactionHashes(msg) => { + EthMessage::NewPooledTransactionHashes66(msg) => { self.try_emit_broadcast(PeerMessage::PooledTransactions(msg)).into() } + EthMessage::NewPooledTransactionHashes68(msg) => { + if msg.hashes.len() != msg.types.len() || msg.hashes.len() != msg.sizes.len() { + return OnIncomingMessageOutcome::BadMessage { + error: EthStreamError::TransactionHashesInvalidLenOfFields { + hashes_len: msg.hashes.len(), + types_len: msg.types.len(), + sizes_len: msg.sizes.len(), + }, + message: EthMessage::NewPooledTransactionHashes68(msg), + } + } + // TODO revise `PeerMessage::PooledTransactions` to have `types` and `sizes` + self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.hashes.into())).into() + } EthMessage::GetBlockHeaders(req) => { on_request!(req, BlockHeaders, GetBlockHeaders) } @@ -237,7 +251,13 @@ impl ActiveSession { self.queued_outgoing.push_back(EthBroadcastMessage::NewBlock(msg.block).into()); } PeerMessage::PooledTransactions(msg) => { - self.queued_outgoing.push_back(EthMessage::NewPooledTransactionHashes(msg).into()); + if self.conn.version() >= EthVersion::Eth68 { + // TODO + // we don't know types and sizes yet + } else { + self.queued_outgoing + .push_back(EthMessage::NewPooledTransactionHashes66(msg).into()); + } } PeerMessage::EthRequest(req) => { let deadline = self.request_deadline(); @@ -699,8 +719,8 @@ mod tests { }; use reth_ecies::util::pk2id; use reth_eth_wire::{ - EthVersion, GetBlockBodies, HelloMessage, NewPooledTransactionHashes, ProtocolVersion, - Status, StatusBuilder, UnauthedEthStream, UnauthedP2PStream, + EthVersion, GetBlockBodies, HelloMessage, ProtocolVersion, Status, StatusBuilder, + UnauthedEthStream, UnauthedP2PStream, }; use reth_net_common::bandwidth_meter::BandwidthMeter; use reth_primitives::{ForkFilter, Hardfork, MAINNET}; @@ -929,9 +949,7 @@ mod tests { let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move { for _ in 0..num_messages { client_stream - .send(EthMessage::NewPooledTransactionHashes(NewPooledTransactionHashes( - vec![], - ))) + .send(EthMessage::NewPooledTransactionHashes66(Vec::new().into())) .await .unwrap(); } @@ -1030,7 +1048,7 @@ mod tests { let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move { client_stream - .send(EthMessage::NewPooledTransactionHashes(NewPooledTransactionHashes(vec![]))) + .send(EthMessage::NewPooledTransactionHashes66(Vec::new().into())) .await .unwrap(); let _ = tokio::time::timeout(Duration::from_secs(100), client_stream.next()).await; diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index fc31517d8..ef6ebadea 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -10,7 +10,7 @@ use crate::{ }; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use reth_eth_wire::{ - GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions, + GetPooledTransactions, NewPooledTransactionHashes66, PooledTransactions, Transactions, }; use reth_interfaces::{p2p::error::RequestResult, sync::SyncStateProvider}; use reth_network_api::{Peers, ReputationChangeKind}; @@ -249,7 +249,7 @@ where fn on_new_pooled_transaction_hashes( &mut self, peer_id: PeerId, - msg: NewPooledTransactionHashes, + msg: NewPooledTransactionHashes66, ) { // If the node is currently syncing, ignore transactions if self.network.is_syncing() { @@ -338,7 +338,7 @@ where // Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the // pool if !self.network.is_syncing() { - let msg = NewPooledTransactionHashes(self.pool.pooled_transactions()); + let msg = NewPooledTransactionHashes66(self.pool.pooled_transactions()); self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg, @@ -546,7 +546,7 @@ pub enum NetworkTransactionEvent { /// Received list of transactions from the given peer. IncomingTransactions { peer_id: PeerId, msg: Transactions }, /// Received list of transactions hashes to the given peer. - IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes }, + IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes66 }, /// Incoming `GetPooledTransactions` request from a peer. GetPooledTransactions { peer_id: PeerId,