diff --git a/crates/net/eth-wire/src/ethstream.rs b/crates/net/eth-wire/src/ethstream.rs index e7d5a523c..029d935a7 100644 --- a/crates/net/eth-wire/src/ethstream.rs +++ b/crates/net/eth-wire/src/ethstream.rs @@ -1,5 +1,6 @@ use crate::{ error::{EthStreamError, HandshakeError}, + message::{EthBroadcastMessage, ProtocolBroadcastMessage}, types::{EthMessage, ProtocolMessage, Status}, }; use bytes::{Bytes, BytesMut}; @@ -142,9 +143,29 @@ impl EthStream { } } +impl EthStream +where + S: Sink + Unpin, + EthStreamError: From, +{ + /// Same as [`Sink::start_send`] but accepts a [`EthBroadcastMessage`] instead. + pub fn start_send_broadcast( + &mut self, + item: EthBroadcastMessage, + ) -> Result<(), EthStreamError> { + let mut bytes = BytesMut::new(); + ProtocolBroadcastMessage::from(item).encode(&mut bytes); + let bytes = bytes.freeze(); + + self.inner.start_send_unpin(bytes)?; + + Ok(()) + } +} + impl Stream for EthStream where - S: Stream> + Unpin, + S: Stream> + Unpin, EthStreamError: From, { type Item = Result; diff --git a/crates/net/eth-wire/src/types/message.rs b/crates/net/eth-wire/src/types/message.rs index c5e3ffc8c..eb773c809 100644 --- a/crates/net/eth-wire/src/types/message.rs +++ b/crates/net/eth-wire/src/types/message.rs @@ -4,12 +4,12 @@ use super::{ GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, Status, Transactions, }; -use bytes::Buf; +use bytes::{Buf, BufMut}; use reth_rlp::{length_of_length, Decodable, Encodable, Header}; -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; -#[derive(Clone, Debug, PartialEq, Eq)] /// An `eth` protocol message, containing a message ID and payload. +#[derive(Clone, Debug, PartialEq, Eq)] pub struct ProtocolMessage { pub message_type: EthMessageID, pub message: EthMessage, @@ -79,13 +79,13 @@ impl ProtocolMessage { /// Encodes the protocol message into bytes. /// The message type is encoded as a single byte and prepended to the message. impl Encodable for ProtocolMessage { - fn length(&self) -> usize { - self.message_type.length() + self.message.length() - } - fn encode(&self, out: &mut dyn bytes::BufMut) { + fn encode(&self, out: &mut dyn BufMut) { self.message_type.encode(out); self.message.encode(out); } + fn length(&self) -> usize { + self.message_type.length() + self.message.length() + } } /// Decodes a protocol message from bytes, using the first byte to determine the message type. @@ -103,7 +103,30 @@ impl From for ProtocolMessage { } } -// TODO: determine whats up with this enum variant size warning +/// Represents messages that can be sent to multiple peers. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ProtocolBroadcastMessage { + pub message_type: EthMessageID, + pub message: EthBroadcastMessage, +} + +/// Encodes the protocol message into bytes. +/// The message type is encoded as a single byte and prepended to the message. +impl Encodable for ProtocolBroadcastMessage { + fn encode(&self, out: &mut dyn BufMut) { + self.message_type.encode(out); + self.message.encode(out); + } + fn length(&self) -> usize { + self.message_type.length() + self.message.length() + } +} + +impl From for ProtocolBroadcastMessage { + fn from(message: EthBroadcastMessage) -> Self { + ProtocolBroadcastMessage { message_type: message.message_id(), message } + } +} /// Represents a message in the eth wire protocol, versions 66 and 67. /// @@ -162,6 +185,25 @@ impl EthMessage { } impl Encodable for EthMessage { + fn encode(&self, out: &mut dyn BufMut) { + match self { + EthMessage::Status(status) => status.encode(out), + EthMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out), + EthMessage::NewBlock(new_block) => new_block.encode(out), + EthMessage::Transactions(transactions) => transactions.encode(out), + EthMessage::NewPooledTransactionHashes(hashes) => hashes.encode(out), + EthMessage::GetBlockHeaders(request) => request.encode(out), + EthMessage::BlockHeaders(headers) => headers.encode(out), + EthMessage::GetBlockBodies(request) => request.encode(out), + EthMessage::BlockBodies(bodies) => bodies.encode(out), + EthMessage::GetPooledTransactions(request) => request.encode(out), + EthMessage::PooledTransactions(transactions) => transactions.encode(out), + EthMessage::GetNodeData(request) => request.encode(out), + EthMessage::NodeData(data) => data.encode(out), + EthMessage::GetReceipts(request) => request.encode(out), + EthMessage::Receipts(receipts) => receipts.encode(out), + } + } fn length(&self) -> usize { match self { EthMessage::Status(status) => status.length(), @@ -181,23 +223,49 @@ impl Encodable for EthMessage { EthMessage::Receipts(receipts) => receipts.length(), } } - fn encode(&self, out: &mut dyn bytes::BufMut) { +} + +/// Represents broadcast messages of [`EthMessage`] that can be sent to multiple peers +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum EthBroadcastMessage { + NewBlockHashes(Arc), + NewBlock(Arc), + Transactions(Arc), + NewPooledTransactionHashes(Arc), +} + +// === impl EthBroadcastMessage === + +impl EthBroadcastMessage { + /// Returns the message's ID. + pub fn message_id(&self) -> EthMessageID { match self { - EthMessage::Status(status) => status.encode(out), - EthMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out), - EthMessage::NewBlock(new_block) => new_block.encode(out), - EthMessage::Transactions(transactions) => transactions.encode(out), - EthMessage::NewPooledTransactionHashes(hashes) => hashes.encode(out), - EthMessage::GetBlockHeaders(request) => request.encode(out), - EthMessage::BlockHeaders(headers) => headers.encode(out), - EthMessage::GetBlockBodies(request) => request.encode(out), - EthMessage::BlockBodies(bodies) => bodies.encode(out), - EthMessage::GetPooledTransactions(request) => request.encode(out), - EthMessage::PooledTransactions(transactions) => transactions.encode(out), - EthMessage::GetNodeData(request) => request.encode(out), - EthMessage::NodeData(data) => data.encode(out), - EthMessage::GetReceipts(request) => request.encode(out), - EthMessage::Receipts(receipts) => receipts.encode(out), + EthBroadcastMessage::NewBlockHashes(_) => EthMessageID::NewBlockHashes, + EthBroadcastMessage::NewBlock(_) => EthMessageID::NewBlock, + EthBroadcastMessage::Transactions(_) => EthMessageID::Transactions, + EthBroadcastMessage::NewPooledTransactionHashes(_) => { + EthMessageID::NewPooledTransactionHashes + } + } + } +} + +impl Encodable for EthBroadcastMessage { + fn encode(&self, out: &mut dyn BufMut) { + match self { + EthBroadcastMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out), + EthBroadcastMessage::NewBlock(new_block) => new_block.encode(out), + EthBroadcastMessage::Transactions(transactions) => transactions.encode(out), + EthBroadcastMessage::NewPooledTransactionHashes(hashes) => hashes.encode(out), + } + } + + fn length(&self) -> usize { + match self { + EthBroadcastMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.length(), + EthBroadcastMessage::NewBlock(new_block) => new_block.length(), + EthBroadcastMessage::Transactions(transactions) => transactions.length(), + EthBroadcastMessage::NewPooledTransactionHashes(hashes) => hashes.length(), } } } @@ -224,12 +292,12 @@ pub enum EthMessageID { } impl Encodable for EthMessageID { - fn length(&self) -> usize { - 1 - } fn encode(&self, out: &mut dyn bytes::BufMut) { out.put_u8(*self as u8); } + fn length(&self) -> usize { + 1 + } } impl Decodable for EthMessageID {