feat(net): support eth/68 (#1361)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Kim, JinSan
2023-02-21 12:37:03 +09:00
committed by GitHub
parent cc5e5cab3c
commit c9075920c1
14 changed files with 213 additions and 74 deletions

View File

@ -64,6 +64,12 @@ impl Capability {
pub fn is_eth_v67(&self) -> bool {
self.name == "eth" && self.version == 67
}
/// Whether this is eth v68.
#[inline]
pub fn is_eth_v68(&self) -> bool {
self.name == "eth" && self.version == 68
}
}
#[cfg(any(test, feature = "arbitrary"))]
@ -97,6 +103,7 @@ pub struct Capabilities {
inner: Vec<Capability>,
eth_66: bool,
eth_67: bool,
eth_68: bool,
}
impl Capabilities {
@ -115,7 +122,7 @@ impl Capabilities {
/// Whether the peer supports `eth` sub-protocol.
#[inline]
pub fn supports_eth(&self) -> bool {
self.eth_67 || self.eth_66
self.eth_68 || self.eth_67 || self.eth_66
}
/// Whether this peer supports eth v66 protocol.
@ -129,6 +136,12 @@ impl Capabilities {
pub fn supports_eth_v67(&self) -> bool {
self.eth_67
}
/// Whether this peer supports eth v68 protocol.
#[inline]
pub fn supports_eth_v68(&self) -> bool {
self.eth_68
}
}
impl From<Vec<Capability>> for Capabilities {
@ -136,6 +149,7 @@ impl From<Vec<Capability>> for Capabilities {
Self {
eth_66: value.iter().any(Capability::is_eth_v66),
eth_67: value.iter().any(Capability::is_eth_v67),
eth_68: value.iter().any(Capability::is_eth_v68),
inner: value,
}
}
@ -154,6 +168,7 @@ impl Decodable for Capabilities {
Ok(Self {
eth_66: inner.iter().any(Capability::is_eth_v66),
eth_67: inner.iter().any(Capability::is_eth_v67),
eth_68: inner.iter().any(Capability::is_eth_v68),
inner,
})
}
@ -227,6 +242,15 @@ pub enum SharedCapabilityError {
mod tests {
use super::*;
#[test]
fn from_eth_68() {
let capability = SharedCapability::new("eth", 68, 0).unwrap();
assert_eq!(capability.name(), "eth");
assert_eq!(capability.version(), 68);
assert_eq!(capability, SharedCapability::Eth { version: EthVersion::Eth68, offset: 0 });
}
#[test]
fn from_eth_67() {
let capability = SharedCapability::new("eth", 67, 0).unwrap();
@ -244,4 +268,19 @@ mod tests {
assert_eq!(capability.version(), 66);
assert_eq!(capability, SharedCapability::Eth { version: EthVersion::Eth66, offset: 0 });
}
#[test]
fn capabilities_supports_eth() {
let capabilities: Capabilities = vec![
Capability::new("eth".into(), 66),
Capability::new("eth".into(), 67),
Capability::new("eth".into(), 68),
]
.into();
assert!(capabilities.supports_eth());
assert!(capabilities.supports_eth_v66());
assert!(capabilities.supports_eth_v67());
assert!(capabilities.supports_eth_v68());
}
}

View File

@ -1,5 +1,7 @@
//! Error handling for (`EthStream`)[crate::EthStream]
use crate::{errors::P2PStreamError, DisconnectReason};
use crate::{
errors::P2PStreamError, version::ParseVersionError, DisconnectReason, EthMessageID, EthVersion,
};
use reth_primitives::{Chain, ValidationError, H256};
use std::io;
@ -10,9 +12,15 @@ pub enum EthStreamError {
#[error(transparent)]
P2PStreamError(#[from] P2PStreamError),
#[error(transparent)]
ParseVersionError(#[from] ParseVersionError),
#[error(transparent)]
EthHandshakeError(#[from] EthHandshakeError),
#[error("For {0:?} version, message id({1:?}) is invalid")]
EthInvalidMessageError(EthVersion, EthMessageID),
#[error("message size ({0}) exceeds max length (10MB)")]
MessageTooBig(usize),
#[error("TransactionHashes invalid len of fields: hashes_len={hashes_len} types_len={types_len} sizes_len={sizes_len}")]
TransactionHashesInvalidLenOfFields { hashes_len: usize, types_len: usize, sizes_len: usize },
}
// === impl EthStreamError ===

View File

@ -2,6 +2,7 @@ use crate::{
errors::{EthHandshakeError, EthStreamError},
message::{EthBroadcastMessage, ProtocolBroadcastMessage},
types::{EthMessage, ProtocolMessage, Status},
EthVersion,
};
use futures::{ready, Sink, SinkExt, StreamExt};
use pin_project::pin_project;
@ -9,7 +10,7 @@ use reth_primitives::{
bytes::{Bytes, BytesMut},
ForkFilter,
};
use reth_rlp::{Decodable, Encodable};
use reth_rlp::Encodable;
use std::{
pin::Pin,
task::{Context, Poll},
@ -76,11 +77,12 @@ where
return Err(EthStreamError::MessageTooBig(their_msg.len()))
}
let msg = match ProtocolMessage::decode(&mut their_msg.as_ref()) {
let version = EthVersion::try_from(status.version)?;
let msg = match ProtocolMessage::decode_message(version, &mut their_msg.as_ref()) {
Ok(m) => m,
Err(err) => {
tracing::debug!("rlp decode error in eth handshake: msg={their_msg:x}");
return Err(err.into())
tracing::debug!("decode error in eth handshake: msg={their_msg:x}");
return Err(err)
}
};
@ -120,7 +122,7 @@ where
// now we can create the `EthStream` because the peer has successfully completed
// the handshake
let stream = EthStream::new(self.inner);
let stream = EthStream::new(version, self.inner);
Ok((stream, resp))
}
@ -136,6 +138,7 @@ where
#[pin_project]
#[derive(Debug)]
pub struct EthStream<S> {
version: EthVersion,
#[pin]
inner: S,
}
@ -143,8 +146,13 @@ pub struct EthStream<S> {
impl<S> EthStream<S> {
/// Creates a new unauthed [`EthStream`] from a provided stream. You will need
/// to manually handshake a peer.
pub fn new(inner: S) -> Self {
Self { inner }
pub fn new(version: EthVersion, inner: S) -> Self {
Self { version, inner }
}
/// Returns the eth version.
pub fn version(&self) -> EthVersion {
self.version
}
/// Returns the underlying stream.
@ -203,11 +211,11 @@ where
return Poll::Ready(Some(Err(EthStreamError::MessageTooBig(bytes.len()))))
}
let msg = match ProtocolMessage::decode(&mut bytes.as_ref()) {
let msg = match ProtocolMessage::decode_message(*this.version, &mut bytes.as_ref()) {
Ok(m) => m,
Err(err) => {
tracing::debug!("rlp decode error: msg={bytes:x}");
return Poll::Ready(Some(Err(err.into())))
tracing::debug!("decode error: msg={bytes:x}");
return Poll::Ready(Some(Err(err)))
}
};
@ -337,7 +345,7 @@ mod tests {
// roughly based off of the design of tokio::net::TcpListener
let (incoming, _) = listener.accept().await.unwrap();
let stream = PassthroughCodec::default().framed(incoming);
let mut stream = EthStream::new(stream);
let mut stream = EthStream::new(EthVersion::Eth67, stream);
// use the stream to get the next message
let message = stream.next().await.unwrap().unwrap();
@ -346,7 +354,7 @@ mod tests {
let outgoing = TcpStream::connect(local_addr).await.unwrap();
let sink = PassthroughCodec::default().framed(outgoing);
let mut client_stream = EthStream::new(sink);
let mut client_stream = EthStream::new(EthVersion::Eth67, sink);
client_stream.send(test_msg).await.unwrap();
@ -372,7 +380,7 @@ mod tests {
// roughly based off of the design of tokio::net::TcpListener
let (incoming, _) = listener.accept().await.unwrap();
let stream = ECIESStream::incoming(incoming, server_key).await.unwrap();
let mut stream = EthStream::new(stream);
let mut stream = EthStream::new(EthVersion::Eth67, stream);
// use the stream to get the next message
let message = stream.next().await.unwrap().unwrap();
@ -386,7 +394,7 @@ mod tests {
let outgoing = TcpStream::connect(local_addr).await.unwrap();
let outgoing = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap();
let mut client_stream = EthStream::new(outgoing);
let mut client_stream = EthStream::new(EthVersion::Eth67, outgoing);
client_stream.send(test_msg).await.unwrap();

View File

@ -100,7 +100,7 @@ impl HelloMessageBuilder {
protocol_version: protocol_version.unwrap_or_default(),
client_version: client_version.unwrap_or_else(|| DEFAULT_CLIENT_VERSION.to_string()),
capabilities: capabilities
.unwrap_or_else(|| vec![EthVersion::Eth66.into(), EthVersion::Eth67.into()]),
.unwrap_or_else(|| vec![EthVersion::Eth67.into(), EthVersion::Eth66.into()]),
port: port.unwrap_or(30303),
id,
}

View File

@ -948,7 +948,7 @@ mod tests {
#[test]
fn test_peer_lower_capability_version() {
let local_capabilities: Vec<Capability> =
vec![EthVersion::Eth66.into(), EthVersion::Eth67.into()];
vec![EthVersion::Eth66.into(), EthVersion::Eth67.into(), EthVersion::Eth68.into()];
let peer_capabilities: Vec<Capability> = vec![EthVersion::Eth66.into()];
let shared_capability =

View File

@ -104,19 +104,33 @@ pub struct SharedTransactions(
#[derive_arbitrary(rlp)]
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct NewPooledTransactionHashes(
pub struct NewPooledTransactionHashes66(
/// Transaction hashes for new transactions that have appeared on the network.
/// Clients should request the transactions with the given hashes using a
/// [`GetPooledTransactions`](crate::GetPooledTransactions) message.
pub Vec<H256>,
);
impl From<Vec<H256>> for NewPooledTransactionHashes {
impl From<Vec<H256>> for NewPooledTransactionHashes66 {
fn from(v: Vec<H256>) -> Self {
NewPooledTransactionHashes(v)
NewPooledTransactionHashes66(v)
}
}
/// Same as [`NewPooledTransactionHashes66`] but extends that that beside the transaction hashes,
/// the node sends the transaction types and their sizes (as defined in EIP-2718) as well.
#[derive_arbitrary(rlp)]
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct NewPooledTransactionHashes68 {
/// Transaction types for new transactions that have appeared on the network.
pub types: Vec<u8>,
/// Transaction sizes for new transactions that have appeared on the network.
pub sizes: Vec<usize>,
/// Transaction hashes for new transactions that have appeared on the network.
pub hashes: Vec<H256>,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,10 +1,10 @@
#![allow(missing_docs)]
use super::{
broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewPooledTransactionHashes,
NodeData, PooledTransactions, Receipts, Status, Transactions,
GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, Transactions,
};
use crate::SharedTransactions;
use crate::{errors::EthStreamError, EthVersion, SharedTransactions};
use reth_primitives::bytes::{Buf, BufMut};
use reth_rlp::{length_of_length, Decodable, Encodable, Header};
use std::{fmt::Debug, sync::Arc};
@ -22,10 +22,9 @@ pub struct ProtocolMessage {
impl ProtocolMessage {
/// Create a new ProtocolMessage from a message type and message rlp bytes.
pub fn decode_message(
message_type: EthMessageID,
buf: &mut &[u8],
) -> Result<Self, reth_rlp::DecodeError> {
pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result<Self, EthStreamError> {
let message_type = EthMessageID::decode(buf)?;
let message = match message_type {
EthMessageID::Status => EthMessage::Status(Status::decode(buf)?),
EthMessageID::NewBlockHashes => {
@ -34,7 +33,15 @@ impl ProtocolMessage {
EthMessageID::NewBlock => EthMessage::NewBlock(Box::new(NewBlock::decode(buf)?)),
EthMessageID::Transactions => EthMessage::Transactions(Transactions::decode(buf)?),
EthMessageID::NewPooledTransactionHashes => {
EthMessage::NewPooledTransactionHashes(NewPooledTransactionHashes::decode(buf)?)
if version >= EthVersion::Eth68 {
EthMessage::NewPooledTransactionHashes68(NewPooledTransactionHashes68::decode(
buf,
)?)
} else {
EthMessage::NewPooledTransactionHashes66(NewPooledTransactionHashes66::decode(
buf,
)?)
}
}
EthMessageID::GetBlockHeaders => {
let request_pair = RequestPair::<GetBlockHeaders>::decode(buf)?;
@ -61,10 +68,22 @@ impl ProtocolMessage {
EthMessage::PooledTransactions(request_pair)
}
EthMessageID::GetNodeData => {
if version >= EthVersion::Eth67 {
return Err(EthStreamError::EthInvalidMessageError(
version,
EthMessageID::GetNodeData,
))
}
let request_pair = RequestPair::<GetNodeData>::decode(buf)?;
EthMessage::GetNodeData(request_pair)
}
EthMessageID::NodeData => {
if version >= EthVersion::Eth67 {
return Err(EthStreamError::EthInvalidMessageError(
version,
EthMessageID::GetNodeData,
))
}
let request_pair = RequestPair::<NodeData>::decode(buf)?;
EthMessage::NodeData(request_pair)
}
@ -93,15 +112,6 @@ impl Encodable for ProtocolMessage {
}
}
/// Decodes a protocol message from bytes, using the first byte to determine the message type.
/// This decodes `eth/66` request ids for each message type.
impl Decodable for ProtocolMessage {
fn decode(buf: &mut &[u8]) -> Result<Self, reth_rlp::DecodeError> {
let message_type = EthMessageID::decode(buf)?;
Self::decode_message(message_type, buf)
}
}
impl From<EthMessage> for ProtocolMessage {
fn from(message: EthMessage) -> Self {
ProtocolMessage { message_type: message.message_id(), message }
@ -133,7 +143,7 @@ impl From<EthBroadcastMessage> for ProtocolBroadcastMessage {
}
}
/// Represents a message in the eth wire protocol, versions 66 and 67.
/// Represents a message in the eth wire protocol, versions 66, 67 and 68.
///
/// The ethereum wire protocol is a set of messages that are broadcasted to the network in two
/// styles:
@ -141,8 +151,15 @@ impl From<EthBroadcastMessage> for ProtocolBroadcastMessage {
/// response message (such as [`PooledTransactions`]).
/// * A message that is broadcast to the network, without a corresponding request.
///
/// The newer `eth/66` is an efficiency upgrade on top of `eth/65`, introducing a request id to
/// correlate request-response message pairs. This allows for request multiplexing.
/// The newer `eth/66` is an efficiency upgrade on top of `eth/65`, introducing a request id to
/// correlate request-response message pairs. This allows for request multiplexing.
///
/// The `eth/67` is based on `eth/66` but only removes two messages, [`GetNodeData`] and
/// [``NodeData].
///
/// The `eth/68` changes only NewPooledTransactionHashes to include `types` and `sized`. For
/// it, NewPooledTransactionHashes is renamed as [`NewPooledTransactionHashes66`] and
/// [`NewPooledTransactionHashes68`] is defined.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum EthMessage {
@ -152,7 +169,8 @@ pub enum EthMessage {
NewBlockHashes(NewBlockHashes),
NewBlock(Box<NewBlock>),
Transactions(Transactions),
NewPooledTransactionHashes(NewPooledTransactionHashes),
NewPooledTransactionHashes66(NewPooledTransactionHashes66),
NewPooledTransactionHashes68(NewPooledTransactionHashes68),
// The following messages are request-response message pairs
GetBlockHeaders(RequestPair<GetBlockHeaders>),
@ -175,7 +193,8 @@ impl EthMessage {
EthMessage::NewBlockHashes(_) => EthMessageID::NewBlockHashes,
EthMessage::NewBlock(_) => EthMessageID::NewBlock,
EthMessage::Transactions(_) => EthMessageID::Transactions,
EthMessage::NewPooledTransactionHashes(_) => EthMessageID::NewPooledTransactionHashes,
EthMessage::NewPooledTransactionHashes66(_) |
EthMessage::NewPooledTransactionHashes68(_) => EthMessageID::NewPooledTransactionHashes,
EthMessage::GetBlockHeaders(_) => EthMessageID::GetBlockHeaders,
EthMessage::BlockHeaders(_) => EthMessageID::BlockHeaders,
EthMessage::GetBlockBodies(_) => EthMessageID::GetBlockBodies,
@ -197,7 +216,8 @@ impl Encodable for EthMessage {
EthMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out),
EthMessage::NewBlock(new_block) => new_block.encode(out),
EthMessage::Transactions(transactions) => transactions.encode(out),
EthMessage::NewPooledTransactionHashes(hashes) => hashes.encode(out),
EthMessage::NewPooledTransactionHashes66(hashes) => hashes.encode(out),
EthMessage::NewPooledTransactionHashes68(hashes) => hashes.encode(out),
EthMessage::GetBlockHeaders(request) => request.encode(out),
EthMessage::BlockHeaders(headers) => headers.encode(out),
EthMessage::GetBlockBodies(request) => request.encode(out),
@ -216,7 +236,8 @@ impl Encodable for EthMessage {
EthMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.length(),
EthMessage::NewBlock(new_block) => new_block.length(),
EthMessage::Transactions(transactions) => transactions.length(),
EthMessage::NewPooledTransactionHashes(hashes) => hashes.length(),
EthMessage::NewPooledTransactionHashes66(hashes) => hashes.length(),
EthMessage::NewPooledTransactionHashes68(hashes) => hashes.length(),
EthMessage::GetBlockHeaders(request) => request.length(),
EthMessage::BlockHeaders(headers) => headers.length(),
EthMessage::GetBlockBodies(request) => request.length(),
@ -403,7 +424,10 @@ where
#[cfg(test)]
mod test {
use crate::types::message::RequestPair;
use crate::{
errors::EthStreamError, types::message::RequestPair, EthMessage, EthMessageID, GetNodeData,
NodeData, ProtocolMessage,
};
use hex_literal::hex;
use reth_rlp::{Decodable, Encodable};
@ -413,6 +437,25 @@ mod test {
buf
}
#[test]
fn test_removed_message_at_eth67() {
let get_node_data =
EthMessage::GetNodeData(RequestPair { request_id: 1337, message: GetNodeData(vec![]) });
let buf = encode(ProtocolMessage {
message_type: EthMessageID::GetNodeData,
message: get_node_data,
});
let msg = ProtocolMessage::decode_message(crate::EthVersion::Eth67, &mut &buf[..]);
assert!(matches!(msg, Err(EthStreamError::EthInvalidMessageError(..))));
let node_data =
EthMessage::NodeData(RequestPair { request_id: 1337, message: NodeData(vec![]) });
let buf =
encode(ProtocolMessage { message_type: EthMessageID::NodeData, message: node_data });
let msg = ProtocolMessage::decode_message(crate::EthVersion::Eth67, &mut &buf[..]);
assert!(matches!(msg, Err(EthStreamError::EthInvalidMessageError(..))));
}
#[test]
fn request_pair_encode() {
let request_pair = RequestPair { request_id: 1337, message: vec![5u8] };

View File

@ -11,13 +11,16 @@ pub struct ParseVersionError(String);
/// The `eth` protocol version.
#[repr(u8)]
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub enum EthVersion {
/// The `eth` protocol version 66.
Eth66 = 66,
/// The `eth` protocol version 67.
Eth67 = 67,
/// The `eth` protocol version 68.
Eth68 = 68,
}
impl EthVersion {
@ -25,8 +28,8 @@ impl EthVersion {
pub fn total_messages(&self) -> u8 {
match self {
EthVersion::Eth66 => 15,
EthVersion::Eth67 => {
// eth/67 is eth/66 minus GetNodeData and NodeData messages
EthVersion::Eth67 | EthVersion::Eth68 => {
// eth/67,68 are eth/66 minus GetNodeData and NodeData messages
13
}
}
@ -50,6 +53,7 @@ impl TryFrom<&str> for EthVersion {
match s {
"66" => Ok(EthVersion::Eth66),
"67" => Ok(EthVersion::Eth67),
"68" => Ok(EthVersion::Eth68),
_ => Err(ParseVersionError(s.to_string())),
}
}
@ -72,6 +76,7 @@ impl TryFrom<u8> for EthVersion {
match u {
66 => Ok(EthVersion::Eth66),
67 => Ok(EthVersion::Eth67),
68 => Ok(EthVersion::Eth68),
_ => Err(ParseVersionError(u.to_string())),
}
}
@ -99,6 +104,7 @@ impl From<EthVersion> for &'static str {
match v {
EthVersion::Eth66 => "66",
EthVersion::Eth67 => "67",
EthVersion::Eth68 => "68",
}
}
}
@ -119,13 +125,15 @@ mod test {
fn test_eth_version_try_from_str() {
assert_eq!(EthVersion::Eth66, EthVersion::try_from("66").unwrap());
assert_eq!(EthVersion::Eth67, EthVersion::try_from("67").unwrap());
assert_eq!(Err(ParseVersionError("68".to_string())), EthVersion::try_from("68"));
assert_eq!(EthVersion::Eth68, EthVersion::try_from("68").unwrap());
assert_eq!(Err(ParseVersionError("69".to_string())), EthVersion::try_from("69"));
}
#[test]
fn test_eth_version_from_str() {
assert_eq!(EthVersion::Eth66, "66".parse().unwrap());
assert_eq!(EthVersion::Eth67, "67".parse().unwrap());
assert_eq!(Err(ParseVersionError("68".to_string())), "68".parse::<EthVersion>());
assert_eq!(EthVersion::Eth68, "68".parse().unwrap());
assert_eq!(Err(ParseVersionError("69".to_string())), "69".parse::<EthVersion>());
}
}

View File

@ -51,8 +51,8 @@ pub mod fuzz_rlp {
use reth_eth_wire::{
BlockBodies, BlockHeaders, DisconnectReason, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetPooledTransactions, GetReceipts, HelloMessage, NewBlock, NewBlockHashes,
NewPooledTransactionHashes, NodeData, P2PMessage, PooledTransactions, Receipts, Status,
Transactions,
NewPooledTransactionHashes66, NewPooledTransactionHashes68, NodeData, P2PMessage,
PooledTransactions, Receipts, Status, Transactions,
};
use reth_primitives::{BlockHashOrNumber, TransactionSigned};
use reth_rlp::{RlpDecodableWrapper, RlpEncodableWrapper};
@ -137,7 +137,8 @@ pub mod fuzz_rlp {
fuzz_type_and_name!(GetBlockBodies, fuzz_GetBlockBodies);
fuzz_type_and_name!(BlockBodies, fuzz_BlockBodies);
fuzz_type_and_name!(NewBlock, fuzz_NewBlock);
fuzz_type_and_name!(NewPooledTransactionHashes, fuzz_NewPooledTransactionHashes);
fuzz_type_and_name!(NewPooledTransactionHashes66, fuzz_NewPooledTransactionHashes66);
fuzz_type_and_name!(NewPooledTransactionHashes68, fuzz_NewPooledTransactionHashes68);
fuzz_type_and_name!(GetPooledTransactions, fuzz_GetPooledTransactions);
fuzz_type_and_name!(PooledTransactions, fuzz_PooledTransactions);
fuzz_type_and_name!(GetNodeData, fuzz_GetNodeData);

View File

@ -1,5 +1,5 @@
//! Decoding tests for [`NewPooledTransactions`]
use reth_eth_wire::NewPooledTransactionHashes;
use reth_eth_wire::NewPooledTransactionHashes66;
use reth_primitives::hex;
use reth_rlp::Decodable;
use std::{fs, path::PathBuf};
@ -10,5 +10,5 @@ fn decode_new_pooled_transaction_hashes_network() {
.join("testdata/new_pooled_transactions_network_rlp");
let data = fs::read_to_string(network_data_path).expect("Unable to read file");
let hex_data = hex::decode(data.trim()).unwrap();
let _txs = NewPooledTransactionHashes::decode(&mut &hex_data[..]).unwrap();
let _txs = NewPooledTransactionHashes66::decode(&mut &hex_data[..]).unwrap();
}

View File

@ -7,7 +7,7 @@ use futures::FutureExt;
use reth_eth_wire::{
capability::RawCapabilityMessage, message::RequestPair, BlockBodies, BlockBody, BlockHeaders,
EthMessage, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
NewBlock, NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts,
NewBlock, NewBlockHashes, NewPooledTransactionHashes66, NodeData, PooledTransactions, Receipts,
SharedTransactions, Transactions,
};
use reth_interfaces::p2p::error::{RequestError, RequestResult};
@ -50,7 +50,7 @@ pub enum PeerMessage {
/// Broadcast transactions _from_ local _to_ a peer.
SendTransactions(SharedTransactions),
/// Send new pooled transactions
PooledTransactions(NewPooledTransactionHashes),
PooledTransactions(NewPooledTransactionHashes66),
/// All `eth` request variants.
EthRequest(PeerRequest),
/// Other than eth namespace message

View File

@ -4,7 +4,7 @@ use crate::{
};
use async_trait::async_trait;
use parking_lot::Mutex;
use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions};
use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes66, SharedTransactions};
use reth_interfaces::{
p2p::headers::client::StatusUpdater,
sync::{SyncState, SyncStateProvider, SyncStateUpdater},
@ -144,7 +144,7 @@ impl NetworkHandle {
pub fn send_transactions_hashes(&self, peer_id: PeerId, msg: Vec<TxHash>) {
self.send_message(NetworkHandleMessage::SendPooledTransactionHashes {
peer_id,
msg: NewPooledTransactionHashes(msg),
msg: NewPooledTransactionHashes66(msg),
})
}
@ -292,7 +292,7 @@ pub(crate) enum NetworkHandleMessage {
/// Sends the list of transactions to the given peer.
SendTransaction { peer_id: PeerId, msg: SharedTransactions },
/// Sends the list of transactions hashes to the given peer.
SendPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes },
SendPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes66 },
/// Send an `eth` protocol request to the peer.
EthRequest {
/// The peer to send the request to.

View File

@ -16,7 +16,7 @@ use reth_eth_wire::{
capability::Capabilities,
errors::{EthHandshakeError, EthStreamError, P2PStreamError},
message::{EthBroadcastMessage, RequestPair},
DisconnectReason, EthMessage, EthStream, P2PStream,
DisconnectReason, EthMessage, EthStream, EthVersion, P2PStream,
};
use reth_interfaces::p2p::error::RequestError;
use reth_metrics_common::metered_sender::MeteredSender;
@ -178,9 +178,23 @@ impl ActiveSession {
EthMessage::Transactions(msg) => {
self.try_emit_broadcast(PeerMessage::ReceivedTransaction(msg)).into()
}
EthMessage::NewPooledTransactionHashes(msg) => {
EthMessage::NewPooledTransactionHashes66(msg) => {
self.try_emit_broadcast(PeerMessage::PooledTransactions(msg)).into()
}
EthMessage::NewPooledTransactionHashes68(msg) => {
if msg.hashes.len() != msg.types.len() || msg.hashes.len() != msg.sizes.len() {
return OnIncomingMessageOutcome::BadMessage {
error: EthStreamError::TransactionHashesInvalidLenOfFields {
hashes_len: msg.hashes.len(),
types_len: msg.types.len(),
sizes_len: msg.sizes.len(),
},
message: EthMessage::NewPooledTransactionHashes68(msg),
}
}
// TODO revise `PeerMessage::PooledTransactions` to have `types` and `sizes`
self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.hashes.into())).into()
}
EthMessage::GetBlockHeaders(req) => {
on_request!(req, BlockHeaders, GetBlockHeaders)
}
@ -237,7 +251,13 @@ impl ActiveSession {
self.queued_outgoing.push_back(EthBroadcastMessage::NewBlock(msg.block).into());
}
PeerMessage::PooledTransactions(msg) => {
self.queued_outgoing.push_back(EthMessage::NewPooledTransactionHashes(msg).into());
if self.conn.version() >= EthVersion::Eth68 {
// TODO
// we don't know types and sizes yet
} else {
self.queued_outgoing
.push_back(EthMessage::NewPooledTransactionHashes66(msg).into());
}
}
PeerMessage::EthRequest(req) => {
let deadline = self.request_deadline();
@ -699,8 +719,8 @@ mod tests {
};
use reth_ecies::util::pk2id;
use reth_eth_wire::{
EthVersion, GetBlockBodies, HelloMessage, NewPooledTransactionHashes, ProtocolVersion,
Status, StatusBuilder, UnauthedEthStream, UnauthedP2PStream,
EthVersion, GetBlockBodies, HelloMessage, ProtocolVersion, Status, StatusBuilder,
UnauthedEthStream, UnauthedP2PStream,
};
use reth_net_common::bandwidth_meter::BandwidthMeter;
use reth_primitives::{ForkFilter, Hardfork, MAINNET};
@ -929,9 +949,7 @@ mod tests {
let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move {
for _ in 0..num_messages {
client_stream
.send(EthMessage::NewPooledTransactionHashes(NewPooledTransactionHashes(
vec![],
)))
.send(EthMessage::NewPooledTransactionHashes66(Vec::new().into()))
.await
.unwrap();
}
@ -1030,7 +1048,7 @@ mod tests {
let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move {
client_stream
.send(EthMessage::NewPooledTransactionHashes(NewPooledTransactionHashes(vec![])))
.send(EthMessage::NewPooledTransactionHashes66(Vec::new().into()))
.await
.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(100), client_stream.next()).await;

View File

@ -10,7 +10,7 @@ use crate::{
};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use reth_eth_wire::{
GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions,
GetPooledTransactions, NewPooledTransactionHashes66, PooledTransactions, Transactions,
};
use reth_interfaces::{p2p::error::RequestResult, sync::SyncStateProvider};
use reth_network_api::{Peers, ReputationChangeKind};
@ -249,7 +249,7 @@ where
fn on_new_pooled_transaction_hashes(
&mut self,
peer_id: PeerId,
msg: NewPooledTransactionHashes,
msg: NewPooledTransactionHashes66,
) {
// If the node is currently syncing, ignore transactions
if self.network.is_syncing() {
@ -338,7 +338,7 @@ where
// Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the
// pool
if !self.network.is_syncing() {
let msg = NewPooledTransactionHashes(self.pool.pooled_transactions());
let msg = NewPooledTransactionHashes66(self.pool.pooled_transactions());
self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes {
peer_id,
msg,
@ -546,7 +546,7 @@ pub enum NetworkTransactionEvent {
/// Received list of transactions from the given peer.
IncomingTransactions { peer_id: PeerId, msg: Transactions },
/// Received list of transactions hashes to the given peer.
IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes },
IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes66 },
/// Incoming `GetPooledTransactions` request from a peer.
GetPooledTransactions {
peer_id: PeerId,