feat(net): add broadcast message variants (#224)

This commit is contained in:
Matthias Seitz
2022-11-18 20:22:29 +01:00
committed by GitHub
parent ebd27b6025
commit 1767d375de
2 changed files with 117 additions and 28 deletions

View File

@ -1,5 +1,6 @@
use crate::{
error::{EthStreamError, HandshakeError},
message::{EthBroadcastMessage, ProtocolBroadcastMessage},
types::{EthMessage, ProtocolMessage, Status},
};
use bytes::{Bytes, BytesMut};
@ -142,9 +143,29 @@ impl<S> EthStream<S> {
}
}
impl<S, E> EthStream<S>
where
S: Sink<Bytes, Error = E> + Unpin,
EthStreamError: From<E>,
{
/// Same as [`Sink::start_send`] but accepts a [`EthBroadcastMessage`] instead.
pub fn start_send_broadcast(
&mut self,
item: EthBroadcastMessage,
) -> Result<(), EthStreamError> {
let mut bytes = BytesMut::new();
ProtocolBroadcastMessage::from(item).encode(&mut bytes);
let bytes = bytes.freeze();
self.inner.start_send_unpin(bytes)?;
Ok(())
}
}
impl<S, E> Stream for EthStream<S>
where
S: Stream<Item = Result<bytes::BytesMut, E>> + Unpin,
S: Stream<Item = Result<BytesMut, E>> + Unpin,
EthStreamError: From<E>,
{
type Item = Result<EthMessage, EthStreamError>;

View File

@ -4,12 +4,12 @@ use super::{
GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewPooledTransactionHashes,
NodeData, PooledTransactions, Receipts, Status, Transactions,
};
use bytes::Buf;
use bytes::{Buf, BufMut};
use reth_rlp::{length_of_length, Decodable, Encodable, Header};
use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};
#[derive(Clone, Debug, PartialEq, Eq)]
/// An `eth` protocol message, containing a message ID and payload.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ProtocolMessage {
pub message_type: EthMessageID,
pub message: EthMessage,
@ -79,13 +79,13 @@ impl ProtocolMessage {
/// Encodes the protocol message into bytes.
/// The message type is encoded as a single byte and prepended to the message.
impl Encodable for ProtocolMessage {
fn length(&self) -> usize {
self.message_type.length() + self.message.length()
}
fn encode(&self, out: &mut dyn bytes::BufMut) {
fn encode(&self, out: &mut dyn BufMut) {
self.message_type.encode(out);
self.message.encode(out);
}
fn length(&self) -> usize {
self.message_type.length() + self.message.length()
}
}
/// Decodes a protocol message from bytes, using the first byte to determine the message type.
@ -103,7 +103,30 @@ impl From<EthMessage> for ProtocolMessage {
}
}
// TODO: determine whats up with this enum variant size warning
/// Represents messages that can be sent to multiple peers.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ProtocolBroadcastMessage {
pub message_type: EthMessageID,
pub message: EthBroadcastMessage,
}
/// Encodes the protocol message into bytes.
/// The message type is encoded as a single byte and prepended to the message.
impl Encodable for ProtocolBroadcastMessage {
fn encode(&self, out: &mut dyn BufMut) {
self.message_type.encode(out);
self.message.encode(out);
}
fn length(&self) -> usize {
self.message_type.length() + self.message.length()
}
}
impl From<EthBroadcastMessage> for ProtocolBroadcastMessage {
fn from(message: EthBroadcastMessage) -> Self {
ProtocolBroadcastMessage { message_type: message.message_id(), message }
}
}
/// Represents a message in the eth wire protocol, versions 66 and 67.
///
@ -162,6 +185,25 @@ impl EthMessage {
}
impl Encodable for EthMessage {
fn encode(&self, out: &mut dyn BufMut) {
match self {
EthMessage::Status(status) => status.encode(out),
EthMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out),
EthMessage::NewBlock(new_block) => new_block.encode(out),
EthMessage::Transactions(transactions) => transactions.encode(out),
EthMessage::NewPooledTransactionHashes(hashes) => hashes.encode(out),
EthMessage::GetBlockHeaders(request) => request.encode(out),
EthMessage::BlockHeaders(headers) => headers.encode(out),
EthMessage::GetBlockBodies(request) => request.encode(out),
EthMessage::BlockBodies(bodies) => bodies.encode(out),
EthMessage::GetPooledTransactions(request) => request.encode(out),
EthMessage::PooledTransactions(transactions) => transactions.encode(out),
EthMessage::GetNodeData(request) => request.encode(out),
EthMessage::NodeData(data) => data.encode(out),
EthMessage::GetReceipts(request) => request.encode(out),
EthMessage::Receipts(receipts) => receipts.encode(out),
}
}
fn length(&self) -> usize {
match self {
EthMessage::Status(status) => status.length(),
@ -181,23 +223,49 @@ impl Encodable for EthMessage {
EthMessage::Receipts(receipts) => receipts.length(),
}
}
fn encode(&self, out: &mut dyn bytes::BufMut) {
}
/// Represents broadcast messages of [`EthMessage`] that can be sent to multiple peers
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EthBroadcastMessage {
NewBlockHashes(Arc<NewBlockHashes>),
NewBlock(Arc<NewBlock>),
Transactions(Arc<Transactions>),
NewPooledTransactionHashes(Arc<NewPooledTransactionHashes>),
}
// === impl EthBroadcastMessage ===
impl EthBroadcastMessage {
/// Returns the message's ID.
pub fn message_id(&self) -> EthMessageID {
match self {
EthMessage::Status(status) => status.encode(out),
EthMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out),
EthMessage::NewBlock(new_block) => new_block.encode(out),
EthMessage::Transactions(transactions) => transactions.encode(out),
EthMessage::NewPooledTransactionHashes(hashes) => hashes.encode(out),
EthMessage::GetBlockHeaders(request) => request.encode(out),
EthMessage::BlockHeaders(headers) => headers.encode(out),
EthMessage::GetBlockBodies(request) => request.encode(out),
EthMessage::BlockBodies(bodies) => bodies.encode(out),
EthMessage::GetPooledTransactions(request) => request.encode(out),
EthMessage::PooledTransactions(transactions) => transactions.encode(out),
EthMessage::GetNodeData(request) => request.encode(out),
EthMessage::NodeData(data) => data.encode(out),
EthMessage::GetReceipts(request) => request.encode(out),
EthMessage::Receipts(receipts) => receipts.encode(out),
EthBroadcastMessage::NewBlockHashes(_) => EthMessageID::NewBlockHashes,
EthBroadcastMessage::NewBlock(_) => EthMessageID::NewBlock,
EthBroadcastMessage::Transactions(_) => EthMessageID::Transactions,
EthBroadcastMessage::NewPooledTransactionHashes(_) => {
EthMessageID::NewPooledTransactionHashes
}
}
}
}
impl Encodable for EthBroadcastMessage {
fn encode(&self, out: &mut dyn BufMut) {
match self {
EthBroadcastMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out),
EthBroadcastMessage::NewBlock(new_block) => new_block.encode(out),
EthBroadcastMessage::Transactions(transactions) => transactions.encode(out),
EthBroadcastMessage::NewPooledTransactionHashes(hashes) => hashes.encode(out),
}
}
fn length(&self) -> usize {
match self {
EthBroadcastMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.length(),
EthBroadcastMessage::NewBlock(new_block) => new_block.length(),
EthBroadcastMessage::Transactions(transactions) => transactions.length(),
EthBroadcastMessage::NewPooledTransactionHashes(hashes) => hashes.length(),
}
}
}
@ -224,12 +292,12 @@ pub enum EthMessageID {
}
impl Encodable for EthMessageID {
fn length(&self) -> usize {
1
}
fn encode(&self, out: &mut dyn bytes::BufMut) {
out.put_u8(*self as u8);
}
fn length(&self) -> usize {
1
}
}
impl Decodable for EthMessageID {