diff --git a/docs/repo/crates/eth-wire.md b/docs/repo/crates/eth-wire.md new file mode 100644 index 000000000..0c216644f --- /dev/null +++ b/docs/repo/crates/eth-wire.md @@ -0,0 +1,375 @@ +# eth-wire + +The `eth-wire` crate provides abstractions over the [RLPx](https://github.com/ethereum/devp2p/blob/master/rlpx.md) and +[Eth wire](https://github.com/ethereum/devp2p/blob/master/caps/eth.md) protocols. + +This crate can be thought of as having 2 components: + +1. Data structures which serialize and deserialize the eth protcol messages into Rust compatible types. +2. Abstractions over Tokio Streams which operate on these types. + +(Note that ECIES is implemented in a seperate `reth-ecies` crate.) +## Types +The most basic Eth-wire type is an `ProtocolMessage`. It describes all messages that reth can send/receive. + +[File: crates/net/eth-wire/src/types/message.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/types/message.rs) +```rust, ignore +/// An `eth` protocol message, containing a message ID and payload. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ProtocolMessage { + pub message_type: EthMessageID, + pub message: EthMessage, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum EthMessage { + Status(Status), + NewBlockHashes(NewBlockHashes), + Transactions(Transactions), + NewPooledTransactionHashes(NewPooledTransactionHashes), + GetBlockHeaders(RequestPair), + // ... + GetReceipts(RequestPair), + Receipts(RequestPair), +} + +/// Represents message IDs for eth protocol messages. +#[repr(u8)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum EthMessageID { + Status = 0x00, + NewBlockHashes = 0x01, + Transactions = 0x02, + // ... + NodeData = 0x0e, + GetReceipts = 0x0f, + Receipts = 0x10, +} + +``` +Messages can either be broadcast to the network, or can be a request/response message to a single peer. This 2nd type of message is +described using a `RequestPair` struct, which is simply a concatenation of the underlying message with a request id. + +[File: crates/net/eth-wire/src/types/message.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/types/message.rs) +```rust, ignore +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RequestPair { + pub request_id: u64, + pub message: T, +} + +``` +Every `Ethmessage` has a correspoding rust struct which implements the `Encodable` and `Decodable` traits. +These traits are defined as follows: + +[Crate: crates/common/rlp](https://github.com/paradigmxyz/reth/blob/main/crates/common/rlp) +```rust, ignore +pub trait Decodable: Sized { + fn decode(buf: &mut &[u8]) -> Result; +} +pub trait Encodable { + fn encode(&self, out: &mut dyn BufMut); + fn length(&self) -> usize; +} +``` +These traits describe how the `Ethmessage` should be serialized/deserialized into raw bytes using the RLP format. +In reth all [RLP](https://ethereum.org/en/developers/docs/data-structures-and-encoding/rlp/) encode/decode operations are handled by the `common/rlp` and `common/rlp-derive` crates. + +Note that the `ProtocolMessage` itself implements these traits, so any stream of bytes can be converted into it by calling `ProtocolMessage::decode()` and vice versa with `ProtocolMessage::encode()`. The message type is determined by the first byte of the byte stream. + +### Example: The Transactions message +Let's understand how an `EthMessage` is implemented by taking a look at the `Transactions` Message. The eth specification describes a Transaction message as a list of RLP encoded transactions: + +[File: ethereum/devp2p/caps/eth.md](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#transactions-0x02) +``` +Transactions (0x02) +[tx₁, tx₂, ...] + +Specify transactions that the peer should make sure is included on its transaction queue. +The items in the list are transactions in the format described in the main Ethereum specification. +... + +``` + +In reth, this is represented as: + +[File: crates/net/eth-wire/src/types/broadcast.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/types/broadcast.rs) +```rust,ignore +pub struct Transactions( + /// New transactions for the peer to include in its mempool. + pub Vec, +); +``` + +And the corresponding trait implementations are present in the primitives crate. + +[File: crates/primitives/src/transaction/mod.rs](https://github.com/paradigmxyz/reth/blob/main/crates/primitives/src/transaction/mod.rs) +```rust, ignore +#[main_codec] +#[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default)] +pub struct TransactionSigned { + pub hash: TxHash, + pub signature: Signature, + #[deref] + #[as_ref] + pub transaction: Transaction, +} + +impl Encodable for TransactionSigned { + fn encode(&self, out: &mut dyn bytes::BufMut) { + self.encode_inner(out, true); + } + + fn length(&self) -> usize { + let len = self.payload_len(); + len + length_of_length(len) + } +} + +impl Decodable for TransactionSigned { + fn decode(buf: &mut &[u8]) -> Result { + // Implementation omitted for brevity + //... + } + +``` +Now that we know how the types work, let's take a look at how these are utilized in the network. + +## P2PStream +The lowest level stream to communicate with other peers is the P2P stream. It takes an underlying Tokio stream and does the following: + +- Tracks and Manages Ping and pong messages and sends them when needed. +- Keeps track of the SharedCapabilities between the reth node and its peers. +- Receives bytes from peers, decompresses and forwards them to its parent stream. +- Receives bytes from its parent stream, compresses them and sends it to peers. + +Decompression/Compression of bytes is done with snappy algorithm ([EIP 706](https://eips.ethereum.org/EIPS/eip-706)) +using the external `snap` crate. + +[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/p2pstream.rs) +```rust,ignore +#[pin_project] +pub struct P2PStream { + #[pin] + inner: S, + encoder: snap::raw::Encoder, + decoder: snap::raw::Decoder, + pinger: Pinger, + shared_capability: SharedCapability, + outgoing_messages: VecDeque, + disconnecting: bool, +} +``` +### Pinger +To manage pinging, an instance of the `Pinger` struct is used. This is a state machine which keeps track of how many pings +we have sent/received and the timeouts associated with them. + +[File: crates/net/eth-wire/src/pinger.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/pinger.rs) +```rust,ignore +#[derive(Debug)] +pub(crate) struct Pinger { + /// The timer used for the next ping. + ping_interval: Interval, + /// The timer used for the next ping. + timeout_timer: Pin>, + timeout: Duration, + state: PingState, +} + +/// This represents the possible states of the pinger. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum PingState { + /// There are no pings in flight, or all pings have been responded to. + Ready, + /// We have sent a ping and are waiting for a pong, but the peer has missed n pongs. + WaitingForPong, + /// The peer has failed to respond to a ping. + TimedOut, +} +``` + +State transitions are then implemented like a future, with the `poll_ping` function advancing the state of the pinger. + +[File: crates/net/eth-wire/src/pinger.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/pinger.rs) +```rust, ignore +pub(crate) fn poll_ping( + &mut self, + cx: &mut Context<'_>, +) -> Poll> { + match self.state() { + PingState::Ready => { + if self.ping_interval.poll_tick(cx).is_ready() { + self.timeout_timer.as_mut().reset(Instant::now() + self.timeout); + self.state = PingState::WaitingForPong; + return Poll::Ready(Ok(PingerEvent::Ping)) + } + } + PingState::WaitingForPong => { + if self.timeout_timer.is_elapsed() { + self.state = PingState::TimedOut; + return Poll::Ready(Ok(PingerEvent::Timeout)) + } + } + PingState::TimedOut => { + return Poll::Pending + } + }; + Poll::Pending +``` + +### Sending and receiving data +To send and recieve data, the P2PStream itself is a future which implemenents the `Stream` and `Sink` traits from the `futures` crate. + +For the `Stream` trait, the `inner` stream is polled, decompressed and returned. Most of the code is just +error handling and is omitted here for clarity. + +[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/p2pstream.rs) +```rust,ignore + +impl Stream for P2PStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + while let Poll::Ready(res) = this.inner.poll_next_unpin(cx) { + let bytes = match res { + Some(Ok(bytes)) => bytes, + Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))), + None => return Poll::Ready(None), + }; + let decompressed_len = snap::raw::decompress_len(&bytes[1..])?; + let mut decompress_buf = BytesMut::zeroed(decompressed_len + 1); + this.decoder.decompress(&bytes[1..], &mut decompress_buf[1..])?; + // ... Omitted Error handling + decompress_buf[0] = bytes[0] - this.shared_capability.offset(); + return Poll::Ready(Some(Ok(decompress_buf))) + } + } +} +``` + +Similarly, for the `Sink` trait, we do the reverse, compressing and sending data out to the `inner` stream. +The important functions in this trait are shown below. + +[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/p2pstream.rs) +```rust, ignore +impl Sink for P2PStream { + fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> { + let this = self.project(); + let mut compressed = BytesMut::zeroed(1 + snap::raw::max_compress_len(item.len() - 1)); + let compressed_size = this.encoder.compress(&item[1..], &mut compressed[1..])?; + compressed.truncate(compressed_size + 1); + compressed[0] = item[0] + this.shared_capability.offset(); + this.outgoing_messages.push_back(compressed.freeze()); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { + match ready!(this.inner.as_mut().poll_flush(cx)) { + Err(err) => return Poll::Ready(Err(err.into())), + Ok(()) => { + if let Some(message) = this.outgoing_messages.pop_front() { + if let Err(err) = this.inner.as_mut().start_send(message) { + return Poll::Ready(Err(err.into())) + } + } else { + return Poll::Ready(Ok(())) + } + } + } + } + } +} +``` + + +## EthStream +The EthStream is very simple, it does not keep track of any state, it simply wraps the P2Pstream. + +[File: crates/net/eth-wire/src/ethstream.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/ethstream.rs) +```rust,ignore +#[pin_project] +pub struct EthStream { + #[pin] + inner: S, +} +``` +EthStream's only job is to perform the RLP decoding/encoding, using the `ProtocolMessage::decode()` and `ProtocolMessage::encode()` +functions we looked at earlier. + +[File: crates/net/eth-wire/src/ethstream.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/ethstream.rs) +```rust,ignore +impl Stream for EthStream { + // ... + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let bytes = ready!(this.inner.poll_next(cx)).unwrap(); + // ... + let msg = match ProtocolMessage::decode(&mut bytes.as_ref()) { + Ok(m) => m, + Err(err) => { + return Poll::Ready(Some(Err(err.into()))) + } + }; + Poll::Ready(Some(Ok(msg.message))) + } +} + +impl Sink for EthStream { + // ... + fn start_send(self: Pin<&mut Self>, item: EthMessage) -> Result<(), Self::Error> { + // ... + let mut bytes = BytesMut::new(); + ProtocolMessage::from(item).encode(&mut bytes); + + let bytes = bytes.freeze(); + self.project().inner.start_send(bytes)?; + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx).map_err(Into::into) + } +} +``` +## Unauthed streams +For a session to be established, peers in the Ethereum network must first exchange a `Hello` message in the RLPx layer and then a +`Status` message in the eth-wire layer. + +To perform these, reth has special `Unauthed` versions of streams described above. + +The `UnauthedP2Pstream` does the `Hello` handshake and returns a `P2PStream`. + +[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/p2pstream.rs) +```rust, ignore +#[pin_project] +pub struct UnauthedP2PStream { + #[pin] + inner: S, +} + +impl UnauthedP2PStream { + // ... + pub async fn handshake(mut self, hello: HelloMessage) -> Result<(P2PStream, HelloMessage), Error> { + let mut raw_hello_bytes = BytesMut::new(); + P2PMessage::Hello(hello.clone()).encode(&mut raw_hello_bytes); + + self.inner.send(raw_hello_bytes.into()).await?; + let first_message_bytes = tokio::time::timeout(HANDSHAKE_TIMEOUT, self.inner.next()).await; + + let their_hello = match P2PMessage::decode(&mut &first_message_bytes[..]) { + Ok(P2PMessage::Hello(hello)) => Ok(hello), + // ... + } + }?; + let stream = P2PStream::new(self.inner, capability); + + Ok((stream, their_hello)) + } +} + +``` +Similary, UnauthedEthStream does the `Status` handshake and returns an `EthStream`. The code is [here](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/ethstream.rs) + +