mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
ci: split ci workflow (#1345)
This commit is contained in:
9
docs/crates/README.md
Normal file
9
docs/crates/README.md
Normal file
@ -0,0 +1,9 @@
|
||||
## Crates
|
||||
|
||||
A tour of the various crates that make up reth.
|
||||
|
||||
- [`reth-db`](./db.md)
|
||||
- [`reth-discv4`](./discv4.md)
|
||||
- [`reth-eth-wire`](./eth-wire.md)
|
||||
- [`reth-network`](./network.md)
|
||||
- [`reth-stages`](./stages.md)
|
||||
341
docs/crates/db.md
Normal file
341
docs/crates/db.md
Normal file
@ -0,0 +1,341 @@
|
||||
# db
|
||||
|
||||
The database is a central component to Reth, enabling persistent storage for data like block headers, block bodies, transactions and more. The Reth database is comprised of key-value storage written to the disk and organized in tables. This chapter might feel a little dense at first, but shortly, you will feel very comfortable understanding and navigating the `db` crate. This chapter will go through the structure of the database, its tables and the mechanics of the `Database` trait.
|
||||
|
||||
<br>
|
||||
|
||||
## Tables
|
||||
|
||||
Within Reth, the database is organized via "tables". A table is any struct that implements the `Table` trait.
|
||||
|
||||
[File: crates/storage/db/src/abstraction/table.rs](https://github.com/paradigmxyz/reth/blob/main/crates/storage/db/src/abstraction/table.rs#L56-L65)
|
||||
|
||||
```rust ignore
|
||||
pub trait Table: Send + Sync + Debug + 'static {
|
||||
/// Return table name as it is present inside the MDBX.
|
||||
const NAME: &'static str;
|
||||
/// Key element of `Table`.
|
||||
///
|
||||
/// Sorting should be taken into account when encoding this.
|
||||
type Key: Key;
|
||||
/// Value element of `Table`.
|
||||
type Value: Value;
|
||||
}
|
||||
|
||||
//--snip--
|
||||
pub trait Key: Encode + Decode + Ord {}
|
||||
|
||||
//--snip--
|
||||
pub trait Value: Compress + Decompress + Serialize {}
|
||||
|
||||
```
|
||||
|
||||
The `Table` trait has two generic values, `Key` and `Value`, which need to implement the `Key` and `Value` traits, respectively. The `Encode` trait is responsible for transforming data into bytes so it can be stored in the database, while the `Decode` trait transforms the bytes back into its original form. Similarly, the `Compress` and `Decompress` traits transform the data to and from a compressed format when storing or reading data from the database.
|
||||
|
||||
There are many tables within the node, all used to store different types of data from `Headers` to `Transactions` and more. Below is a list of all of the tables. You can follow [this link](https://github.com/paradigmxyz/reth/blob/main/crates/storage/db/src/tables/mod.rs#L36) if you would like to see the table definitions for any of the tables below.
|
||||
|
||||
- CanonicalHeaders
|
||||
- HeaderTD
|
||||
- HeaderNumbers
|
||||
- Headers
|
||||
- BlockBodies
|
||||
- BlockOmmers
|
||||
- Transactions
|
||||
- TxHashNumber
|
||||
- Receipts
|
||||
- Logs
|
||||
- PlainAccountState
|
||||
- PlainStorageState
|
||||
- Bytecodes
|
||||
- BlockTransitionIndex
|
||||
- TxTransitionIndex
|
||||
- AccountHistory
|
||||
- StorageHistory
|
||||
- AccountChangeSet
|
||||
- StorageChangeSet
|
||||
- TxSenders
|
||||
- Config
|
||||
- SyncStage
|
||||
|
||||
<br>
|
||||
|
||||
## Database
|
||||
|
||||
Reth's database design revolves around it's main [Database trait](https://github.com/paradigmxyz/reth/blob/0d9b9a392d4196793736522f3fc2ac804991b45d/crates/interfaces/src/db/mod.rs#L33), which takes advantage of [generic associated types](https://blog.rust-lang.org/2022/10/28/gats-stabilization.html) and [a few design tricks](https://sabrinajewson.org/blog/the-better-alternative-to-lifetime-gats#the-better-gats) to implement the database's functionality across many types. Let's take a quick look at the `Database` trait and how it works.
|
||||
|
||||
[File: crates/storage/db/src/abstraction/database.rs](https://github.com/paradigmxyz/reth/blob/main/crates/storage/db/src/abstraction/database.rs#L19)
|
||||
|
||||
```rust ignore
|
||||
/// Main Database trait that spawns transactions to be executed.
|
||||
pub trait Database: for<'a> DatabaseGAT<'a> {
|
||||
/// Create read only transaction.
|
||||
fn tx(&self) -> Result<<Self as DatabaseGAT<'_>>::TX, Error>;
|
||||
|
||||
/// Create read write transaction only possible if database is open with write access.
|
||||
fn tx_mut(&self) -> Result<<Self as DatabaseGAT<'_>>::TXMut, Error>;
|
||||
|
||||
/// Takes a function and passes a read-only transaction into it, making sure it's closed in the
|
||||
/// end of the execution.
|
||||
fn view<T, F>(&self, f: F) -> Result<T, Error>
|
||||
where
|
||||
F: Fn(&<Self as DatabaseGAT<'_>>::TX) -> T,
|
||||
{
|
||||
let tx = self.tx()?;
|
||||
|
||||
let res = f(&tx);
|
||||
tx.commit()?;
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Takes a function and passes a write-read transaction into it, making sure it's committed in
|
||||
/// the end of the execution.
|
||||
fn update<T, F>(&self, f: F) -> Result<T, Error>
|
||||
where
|
||||
F: Fn(&<Self as DatabaseGAT<'_>>::TXMut) -> T,
|
||||
{
|
||||
let tx = self.tx_mut()?;
|
||||
|
||||
let res = f(&tx);
|
||||
tx.commit()?;
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Any type that implements the `Database` trait can create a database transaction, as well as view or update existing transactions. As an example, lets revisit the `Transaction` struct from the `stages` crate. This struct contains a field named `db` which is a reference to a generic type `DB` that implements the `Database` trait. The `Transaction` struct can use the `db` field to store new headers, bodies and senders in the database. In the code snippet below, you can see the `Transaction::open()` method, which uses the `Database::tx_mut()` function to create a mutable transaction.
|
||||
|
||||
[File: crates/stages/src/db.rs](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/db.rs#L95-L98)
|
||||
|
||||
```rust ignore
|
||||
pub struct Transaction<'this, DB: Database> {
|
||||
/// A handle to the DB.
|
||||
pub(crate) db: &'this DB,
|
||||
tx: Option<<DB as DatabaseGAT<'this>>::TXMut>,
|
||||
}
|
||||
|
||||
//--snip--
|
||||
impl<'this, DB> Transaction<'this, DB>
|
||||
where
|
||||
DB: Database,
|
||||
{
|
||||
//--snip--
|
||||
|
||||
/// Open a new inner transaction.
|
||||
pub fn open(&mut self) -> Result<(), Error> {
|
||||
self.tx = Some(self.db.tx_mut()?);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `Database` trait also implements the `DatabaseGAT` trait which defines two associated types `TX` and `TXMut`.
|
||||
|
||||
[File: crates/storage/db/src/abstraction/database.rs](https://github.com/paradigmxyz/reth/blob/main/crates/storage/db/src/abstraction/database.rs#L11)
|
||||
|
||||
```rust ignore
|
||||
/// Implements the GAT method from:
|
||||
/// https://sabrinajewson.org/blog/the-better-alternative-to-lifetime-gats#the-better-gats.
|
||||
///
|
||||
/// Sealed trait which cannot be implemented by 3rd parties, exposed only for implementers
|
||||
pub trait DatabaseGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync {
|
||||
/// RO database transaction
|
||||
type TX: DbTx<'a> + Send + Sync;
|
||||
/// RW database transaction
|
||||
type TXMut: DbTxMut<'a> + DbTx<'a> + Send + Sync;
|
||||
}
|
||||
```
|
||||
|
||||
In Rust, associated types are like generics in that they can be any type fitting the generic's definition, with the difference being that associated types are associated with a trait and can only be used in the context of that trait.
|
||||
|
||||
In the code snippet above, the `DatabaseGAT` trait has two associated types, `TX` and `TXMut`.
|
||||
|
||||
The `TX` type can be any type that implements the `DbTx` trait, which provides a set of functions to interact with read only transactions.
|
||||
|
||||
[File: crates/storage/db/src/abstraction/transaction.rs](https://github.com/paradigmxyz/reth/blob/main/crates/storage/db/src/abstraction/transaction.rs#L36)
|
||||
|
||||
```rust ignore
|
||||
/// Read only transaction
|
||||
pub trait DbTx<'tx>: for<'a> DbTxGAT<'a> {
|
||||
/// Get value
|
||||
fn get<T: Table>(&self, key: T::Key) -> Result<Option<T::Value>, Error>;
|
||||
/// Commit for read only transaction will consume and free transaction and allows
|
||||
/// freeing of memory pages
|
||||
fn commit(self) -> Result<bool, Error>;
|
||||
/// Iterate over read only values in table.
|
||||
fn cursor<T: Table>(&self) -> Result<<Self as DbTxGAT<'_>>::Cursor<T>, Error>;
|
||||
/// Iterate over read only values in dup sorted table.
|
||||
fn cursor_dup<T: DupSort>(&self) -> Result<<Self as DbTxGAT<'_>>::DupCursor<T>, Error>;
|
||||
}
|
||||
```
|
||||
|
||||
The `TXMut` type can be any type that implements the `DbTxMut` trait, which provides a set of functions to interact with read/write transactions.
|
||||
|
||||
[File: crates/storage/db/src/abstraction/transaction.rs](https://github.com/paradigmxyz/reth/blob/main/crates/storage/db/src/abstraction/transaction.rs#L49)
|
||||
|
||||
```rust ignore
|
||||
/// Read write transaction that allows writing to database
|
||||
pub trait DbTxMut<'tx>: for<'a> DbTxMutGAT<'a> {
|
||||
/// Put value to database
|
||||
fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), Error>;
|
||||
/// Delete value from database
|
||||
fn delete<T: Table>(&self, key: T::Key, value: Option<T::Value>) -> Result<bool, Error>;
|
||||
/// Clears database.
|
||||
fn clear<T: Table>(&self) -> Result<(), Error>;
|
||||
/// Cursor for writing
|
||||
fn cursor_write<T: Table>(&self) -> Result<<Self as DbTxMutGAT<'_>>::CursorMut<T>, Error>;
|
||||
/// DupCursor for writing
|
||||
fn cursor_dup_write<T: DupSort>(
|
||||
&self,
|
||||
) -> Result<<Self as DbTxMutGAT<'_>>::DupCursorMut<T>, Error>;
|
||||
}
|
||||
```
|
||||
|
||||
Lets take a look at the `DbTx` and `DbTxMut` traits in action. Revisiting the `Transaction` struct as an example, the `Transaction::get_block_hash()` method uses the `DbTx::get()` function to get a block header hash in the form of `self.get::<tables::CanonicalHeaders>(number)`.
|
||||
|
||||
[File: crates/storage/provider/src/transaction.rs](https://github.com/paradigmxyz/reth/blob/main/crates/storage/provider/src/transaction.rs#L106)
|
||||
|
||||
```rust ignore
|
||||
|
||||
impl<'this, DB> Transaction<'this, DB>
|
||||
where
|
||||
DB: Database,
|
||||
{
|
||||
//--snip--
|
||||
|
||||
/// Query [tables::CanonicalHeaders] table for block hash by block number
|
||||
pub(crate) fn get_block_hash(&self, number: BlockNumber) -> Result<BlockHash, StageError> {
|
||||
let hash = self
|
||||
.get::<tables::CanonicalHeaders>(number)?
|
||||
.ok_or(ProviderError::CanonicalHash { number })?;
|
||||
Ok(hash)
|
||||
}
|
||||
//--snip--
|
||||
}
|
||||
|
||||
//--snip--
|
||||
impl<'a, DB: Database> Deref for Transaction<'a, DB> {
|
||||
type Target = <DB as DatabaseGAT<'a>>::TXMut;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.tx.as_ref().expect("Tried getting a reference to a non-existent transaction")
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `Transaction` struct implements the `Deref` trait, which returns a reference to its `tx` field, which is a `TxMut`. Recall that `TxMut` is a generic type on the `DatabaseGAT` trait, which is defined as `type TXMut: DbTxMut<'a> + DbTx<'a> + Send + Sync;`, giving it access to all of the functions available to `DbTx`, including the `DbTx::get()` function.
|
||||
|
||||
Notice that the function uses a [turbofish](https://techblog.tonsser.com/posts/what-is-rusts-turbofish) to define which table to use when passing in the `key` to the `DbTx::get()` function. Taking a quick look at the function definition, a generic `T` is defined that implements the `Table` trait mentioned at the beginning of this chapter.
|
||||
|
||||
[File: crates/storage/db/src/abstraction/transaction.rs](https://github.com/paradigmxyz/reth/blob/main/crates/storage/db/src/abstraction/transaction.rs#L38)
|
||||
|
||||
```rust ignore
|
||||
fn get<T: Table>(&self, key: T::Key) -> Result<Option<T::Value>, Error>;
|
||||
```
|
||||
|
||||
This design pattern is very powerful and allows Reth to use the methods available to the `DbTx` and `DbTxMut` traits without having to define implementation blocks for each table within the database.
|
||||
|
||||
Lets take a look at a couple examples before moving on. In the snippet below, the `DbTxMut::put()` method is used to insert values into the `CanonicalHeaders`, `Headers` and `HeaderNumbers` tables.
|
||||
|
||||
[File: crates/storage/provider/src/block.rs](https://github.com/paradigmxyz/reth/blob/main/crates/storage/provider/src/block.rs#L121-L125)
|
||||
|
||||
```rust ignore
|
||||
tx.put::<tables::CanonicalHeaders>(block.number, block.hash())?;
|
||||
// Put header with canonical hashes.
|
||||
tx.put::<tables::Headers>(block.number, block.header.as_ref().clone())?;
|
||||
tx.put::<tables::HeaderNumbers>(block.hash(), block.number)?;
|
||||
```
|
||||
|
||||
This next example uses the `DbTx::cursor()` method to get a `Cursor`. The `Cursor` type provides a way to traverse through rows in a database table, one row at a time. A cursor enables the program to perform an operation (updating, deleting, etc) on each row in the table individually. The following code snippet gets a cursor for a few different tables in the database.
|
||||
|
||||
[File: crates/stages/src/stages/execution.rs](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/stages/execution.rs#L93-L101)
|
||||
|
||||
```rust ignore
|
||||
// Get next canonical block hashes to execute.
|
||||
let mut canonicals = db_tx.cursor_read::<tables::CanonicalHeaders>()?;
|
||||
// Get header with canonical hashes.
|
||||
let mut headers = db_tx.cursor_read::<tables::Headers>()?;
|
||||
// Get bodies (to get tx index) with canonical hashes.
|
||||
let mut cumulative_tx_count = db_tx.cursor_read::<tables::CumulativeTxCount>()?;
|
||||
// Get transaction of the block that we are executing.
|
||||
let mut tx = db_tx.cursor_read::<tables::Transactions>()?;
|
||||
// Skip sender recovery and load signer from database.
|
||||
let mut tx_sender = db_tx.cursor_read::<tables::TxSenders>()?;
|
||||
|
||||
```
|
||||
|
||||
We are almost at the last stop in the tour of the `db` crate. In addition to the methods provided by the `DbTx` and `DbTxMut` traits, `DbTx` also inherits the `DbTxGAT` trait, while `DbTxMut` inherits `DbTxMutGAT`. These next two traits provide various associated types related to cursors as well as methods to utilize the cursor types.
|
||||
|
||||
[File: crates/storage/db/src/abstraction/transaction.rs](https://github.com/paradigmxyz/reth/blob/main/crates/storage/db/src/abstraction/transaction.rs#L12-L17)
|
||||
|
||||
```rust ignore
|
||||
pub trait DbTxGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync {
|
||||
/// Cursor GAT
|
||||
type Cursor<T: Table>: DbCursorRO<'a, T> + Send + Sync;
|
||||
/// DupCursor GAT
|
||||
type DupCursor<T: DupSort>: DbDupCursorRO<'a, T> + DbCursorRO<'a, T> + Send + Sync;
|
||||
}
|
||||
```
|
||||
|
||||
Lets look at an examples of how cursors are used. The code snippet below contains the `unwind` method from the `BodyStage` defined in the `stages` crate. This function is responsible for unwinding any changes to the database if there is an error when executing the body stage within the Reth pipeline.
|
||||
|
||||
[File: crates/stages/src/stages/bodies.rs](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/stages/bodies.rs#L205-L238)
|
||||
|
||||
```rust ignore
|
||||
/// Unwind the stage.
|
||||
async fn unwind(
|
||||
&mut self,
|
||||
db: &mut Transaction<'_, DB>,
|
||||
input: UnwindInput,
|
||||
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut tx_count_cursor = db.cursor_write::<tables::CumulativeTxCount>()?;
|
||||
let mut block_ommers_cursor = db.cursor_write::<tables::BlockOmmers>()?;
|
||||
let mut transaction_cursor = db.cursor_write::<tables::Transactions>()?;
|
||||
|
||||
let mut entry = tx_count_cursor.last()?;
|
||||
while let Some((key, count)) = entry {
|
||||
if key.number() <= input.unwind_to {
|
||||
break
|
||||
}
|
||||
|
||||
tx_count_cursor.delete_current()?;
|
||||
entry = tx_count_cursor.prev()?;
|
||||
|
||||
if block_ommers_cursor.seek_exact(key)?.is_some() {
|
||||
block_ommers_cursor.delete_current()?;
|
||||
}
|
||||
|
||||
let prev_count = entry.map(|(_, v)| v).unwrap_or_default();
|
||||
for tx_id in prev_count..count {
|
||||
if transaction_cursor.seek_exact(tx_id)?.is_some() {
|
||||
transaction_cursor.delete_current()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//--snip--
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
This function first grabs a mutable cursor for the `CumulativeTxCount`, `BlockOmmers` and `Transactions` tables.
|
||||
|
||||
The `tx_count_cursor` is used to get the last key value pair written to the `CumulativeTxCount` table and delete key value pair where the cursor is currently pointing.
|
||||
|
||||
The `block_ommers_cursor` is used to get the block ommers from the `BlockOmmers` table at the specified key, and delete the entry where the cursor is currently pointing.
|
||||
|
||||
Finally, the `transaction_cursor` is used to get delete each transaction from the last `TXNumber` written to the database, to the current tx count.
|
||||
|
||||
While this is a brief look at how cursors work in the context of database tables, the chapter on the `libmdbx` crate will go into further detail on how cursors communicate with the database and what is actually happening under the hood.
|
||||
|
||||
<br>
|
||||
|
||||
## Summary
|
||||
|
||||
This chapter was packed with information, so lets do a quick review. The database is comprised of tables, with each table being a collection of key-value pairs representing various pieces of data in the blockchain. Any struct that implements the `Database` trait can view, update or delete entries in the various tables. The database design leverages nested traits and generic associated types to provide methods to interact with each table in the database.
|
||||
|
||||
<br>
|
||||
|
||||
# Next Chapter
|
||||
|
||||
[Next Chapter]()
|
||||
287
docs/crates/discv4.md
Normal file
287
docs/crates/discv4.md
Normal file
@ -0,0 +1,287 @@
|
||||
# Discv4
|
||||
|
||||
The `discv4` crate plays an important role in Reth, enabling discovery of other peers across the network. It is recommended to know how [Kademlia distributed hash tables](https://en.wikipedia.org/wiki/Kademlia) and [Ethereum's node discovery protocol](https://github.com/ethereum/devp2p/blob/master/discv4.md) work before reading through this chapter. While all concepts will be explained through the following sections, reading through the links above will make understanding this chapter much easier! With that note out of the way, lets jump into `disc4`.
|
||||
|
||||
## Starting the Node Discovery Protocol
|
||||
As mentioned in the network and stages chapters, when the node is first started up, the `node::Command::execute()` function is called, which initializes the node and starts to run the Reth pipeline. Throughout the initialization of the node, there are many processes that are are started. One of the processes that is initialized is the p2p network which starts the node discovery protocol amongst other tasks.
|
||||
|
||||
[File: bin/reth/src/node/mod.rs](https://github.com/paradigmxyz/reth/blob/main/bin/reth/src/node/mod.rs#L95)
|
||||
```rust ignore
|
||||
pub async fn execute(&self) -> eyre::Result<()> {
|
||||
//--snip--
|
||||
let network = config
|
||||
.network_config(db.clone(), chain_id, genesis_hash, self.network.disable_discovery)
|
||||
.start_network()
|
||||
.await?;
|
||||
|
||||
info!(peer_id = ?network.peer_id(), local_addr = %network.local_addr(), "Started p2p networking");
|
||||
|
||||
//--snip--
|
||||
}
|
||||
```
|
||||
|
||||
During this process, a new `NetworkManager` is created through the `NetworkManager::new()` function, which starts the discovery protocol through a handful of newly spawned tasks. Lets take a look at how this actually works under the hood.
|
||||
|
||||
[File: crates/net/network/src/manager.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/network/src/manager.rs#L147)
|
||||
```rust ignore
|
||||
impl<C> NetworkManager<C>
|
||||
where
|
||||
C: BlockProvider,
|
||||
{
|
||||
//--snip--
|
||||
|
||||
pub async fn new(config: NetworkConfig<C>) -> Result<Self, NetworkError> {
|
||||
let NetworkConfig {
|
||||
//--snip--
|
||||
secret_key,
|
||||
mut discovery_v4_config,
|
||||
discovery_addr,
|
||||
boot_nodes,
|
||||
//--snip--
|
||||
..
|
||||
} = config;
|
||||
|
||||
//--snip--
|
||||
|
||||
discovery_v4_config = discovery_v4_config.map(|mut disc_config| {
|
||||
// merge configured boot nodes
|
||||
disc_config.bootstrap_nodes.extend(boot_nodes.clone());
|
||||
disc_config.add_eip868_pair("eth", status.forkid);
|
||||
disc_config
|
||||
});
|
||||
|
||||
let discovery = Discovery::new(discovery_addr, secret_key, discovery_v4_config).await?;
|
||||
|
||||
//--snip--
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
First, the `NetworkConfig` is deconstructed and the `disc_config` is updated to merge configured [bootstrap nodes](https://github.com/paradigmxyz/reth/blob/main/crates/net/discv4/src/bootnodes.rs#L8) and add the `forkid` to adhere to [EIP 868](https://eips.ethereum.org/EIPS/eip-868). This updated configuration variable is then passed into the `Discovery::new()` function. Note that `Discovery` is a catch all for all discovery services, which include discv4, DNS discovery and others in the future.
|
||||
|
||||
[File: crates/net/network/src/discovery.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/network/src/discovery.rs#L51)
|
||||
```rust ignore
|
||||
impl Discovery {
|
||||
/// Spawns the discovery service.
|
||||
///
|
||||
/// This will spawn the [`reth_discv4::Discv4Service`] onto a new task and establish a listener
|
||||
/// channel to receive all discovered nodes.
|
||||
pub async fn new(
|
||||
discovery_addr: SocketAddr,
|
||||
sk: SecretKey,
|
||||
discv4_config: Option<Discv4Config>,
|
||||
) -> Result<Self, NetworkError> {
|
||||
let local_enr = NodeRecord::from_secret_key(discovery_addr, &sk);
|
||||
|
||||
let (discv4, discv4_updates, _discv4_service) = if let Some(disc_config) = discv4_config {
|
||||
let (discv4, mut discv4_service) =
|
||||
Discv4::bind(discovery_addr, local_enr, sk, disc_config)
|
||||
.await
|
||||
.map_err(NetworkError::Discovery)?;
|
||||
let discv4_updates = discv4_service.update_stream();
|
||||
// spawn the service
|
||||
let _discv4_service = discv4_service.spawn();
|
||||
(Some(discv4), Some(discv4_updates), Some(_discv4_service))
|
||||
} else {
|
||||
(None, None, None)
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
local_enr,
|
||||
discv4,
|
||||
discv4_updates,
|
||||
_discv4_service,
|
||||
discovered_nodes: Default::default(),
|
||||
queued_events: Default::default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Within this function, the [Ethereum Node Record](https://github.com/ethereum/devp2p/blob/master/enr.md) is created. Participants in the discovery protocol are expected to maintain a node record (ENR) containing up-to-date information of different nodes in the network. All records must use the "v4" identity scheme. Other nodes may request the local record at any time by sending an ["ENRRequest" packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#enrrequest-packet-0x05).
|
||||
|
||||
The `NodeRecord::from_secret_key()` takes the socket address used for discovery and the secret key. The secret key is used to derive a `secp256k1` public key and the peer id is then derived from the public key. These values are then used to create an ENR. Ethereum Node Records are used to locate and communicate with other nodes in the network.
|
||||
|
||||
If the `discv4_config` supplied to the `Discovery::new()` function is `None`, the discv4 service will not be spawned. In this case, no new peers will be discovered across the network. The node will have to rely on manually added peers. However, if the `discv4_config` contains a `Some(Discv4Config)` value, then the `Discv4::bind()` function is called to bind to a new UdpSocket and create the disc_v4 service.
|
||||
|
||||
[File: crates/net/discv4/src/lib.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/discv4/src/lib.rs#L188)
|
||||
```rust ignore
|
||||
impl Discv4 {
|
||||
//--snip--
|
||||
pub async fn bind(
|
||||
local_address: SocketAddr,
|
||||
mut local_node_record: NodeRecord,
|
||||
secret_key: SecretKey,
|
||||
config: Discv4Config,
|
||||
) -> io::Result<(Self, Discv4Service)> {
|
||||
let socket = UdpSocket::bind(local_address).await?;
|
||||
let local_addr = socket.local_addr()?;
|
||||
local_node_record.udp_port = local_addr.port();
|
||||
trace!( target : "discv4", ?local_addr,"opened UDP socket");
|
||||
|
||||
let (to_service, rx) = mpsc::channel(100);
|
||||
|
||||
let service =
|
||||
Discv4Service::new(socket, local_addr, local_node_record, secret_key, config, Some(rx));
|
||||
|
||||
let discv4 = Discv4 { local_addr, to_service };
|
||||
Ok((discv4, service))
|
||||
}
|
||||
//--snip--
|
||||
}
|
||||
```
|
||||
|
||||
To better understand what is actually happening when the disc_v4 service is created, lets take a deeper look at the `Discv4Service::new()` function.
|
||||
|
||||
[File: crates/net/discv4/src/lib.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/discv4/src/lib.rs#L392)
|
||||
```rust ignore
|
||||
impl Discv4Service {
|
||||
/// Create a new instance for a bound [`UdpSocket`].
|
||||
pub(crate) fn new(
|
||||
socket: UdpSocket,
|
||||
local_address: SocketAddr,
|
||||
local_node_record: NodeRecord,
|
||||
secret_key: SecretKey,
|
||||
config: Discv4Config,
|
||||
commands_rx: Option<mpsc::Receiver<Discv4Command>>,
|
||||
) -> Self {
|
||||
let socket = Arc::new(socket);
|
||||
let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
|
||||
let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
|
||||
let mut tasks = JoinSet::<()>::new();
|
||||
|
||||
let udp = Arc::clone(&socket);
|
||||
tasks.spawn(async move { receive_loop(udp, ingress_tx, local_node_record.id).await });
|
||||
|
||||
let udp = Arc::clone(&socket);
|
||||
tasks.spawn(async move { send_loop(udp, egress_rx).await });
|
||||
|
||||
let kbuckets = KBucketsTable::new(
|
||||
NodeKey::from(&local_node_record).into(),
|
||||
Duration::from_secs(60),
|
||||
MAX_NODES_PER_BUCKET,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let self_lookup_interval = tokio::time::interval(config.lookup_interval);
|
||||
|
||||
// Wait `ping_interval` and then start pinging every `ping_interval`
|
||||
let ping_interval = tokio::time::interval_at(
|
||||
tokio::time::Instant::now() + config.ping_interval,
|
||||
config.ping_interval,
|
||||
);
|
||||
|
||||
let evict_expired_requests_interval = tokio::time::interval_at(
|
||||
tokio::time::Instant::now() + config.request_timeout,
|
||||
config.request_timeout,
|
||||
);
|
||||
|
||||
//--snip--
|
||||
}
|
||||
//--snip--
|
||||
}
|
||||
```
|
||||
|
||||
First, new channels are initialized to handle incoming and outgoing traffic related to node discovery. New tasks are then spawned to handle the `receive_loop()` and `send_loop()` functions, which perpetually send incoming disc4 traffic to the `ingress_rx` and outgoing discv4 traffic to the `egress_rx`. Following this, a [`KBucketsTable`](https://docs.rs/discv5/0.1.0/discv5/kbucket/struct.KBucketsTable.html) is initialized to keep track of the node's neighbors throughout the network. If you are unfamiliar with k-buckets, feel free to [follow this link to learn more](https://en.wikipedia.org/wiki/Kademlia#Fixed-size_routing_tables). Following this, the `self_lookup_interval`, `ping_interval` and `evict_expired_requests_interval` are initialized which determines the interval used for periodically querying the network for new neighboring nodes with a low [distance](https://github.com/ethereum/devp2p/blob/master/discv4.md#node-identities) relative to ours, outgoing ping packets and the time elapsed to remove inactive neighbors from the node's records.
|
||||
|
||||
Once the `Discv4Service::new()` function completes, allowing the `Discv4::bind()` function to complete as well, the `discv4_service` is then spawned into its own task and the `Discovery::new()` function returns an new instance of `Discovery`. The newly created `Discovery` type is passed into the `NetworkState::new()` function along with a few other arguments to initialize the network state. The network state is added to the `NetworkManager` where the `NetworkManager` is then spawned into its own task as well.
|
||||
|
||||
|
||||
|
||||
## Polling the Discv4Service and Discovery Events
|
||||
In Rust, the owner of a [`Future`](https://doc.rust-lang.org/std/future/trait.Future.html#) is responsible for advancing the computation by polling the future. This is done by calling `Future::poll`.
|
||||
|
||||
Lets take a detailed look at how `Discv4Service::poll` works under the hood. This function has many moving parts, so we will break it up into smaller sections.
|
||||
|
||||
[File: crates/net/discv4/src/lib.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/discv4/src/lib.rs#L1302)
|
||||
```rust ignore
|
||||
impl Discv4Service {
|
||||
//--snip--
|
||||
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
|
||||
loop {
|
||||
// drain buffered events first
|
||||
if let Some(event) = self.queued_events.pop_front() {
|
||||
return Poll::Ready(event);
|
||||
}
|
||||
|
||||
// trigger self lookup
|
||||
if self.config.enable_lookup
|
||||
&& !self.is_lookup_in_progress()
|
||||
&& self.lookup_interval.poll_tick(cx).is_ready()
|
||||
{
|
||||
let target = self.lookup_rotator.next(&self.local_node_record.id);
|
||||
self.lookup_with(target, None);
|
||||
}
|
||||
|
||||
// evict expired nodes
|
||||
if self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
|
||||
self.evict_expired_requests(Instant::now())
|
||||
}
|
||||
|
||||
// re-ping some peers
|
||||
if self.ping_interval.poll_tick(cx).is_ready() {
|
||||
self.re_ping_oldest();
|
||||
}
|
||||
|
||||
if let Some(Poll::Ready(Some(ip))) =
|
||||
self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
|
||||
{
|
||||
self.set_external_ip_addr(ip);
|
||||
}
|
||||
// --snip--
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
As the function starts, a `loop` is entered and the `Discv4Service.queued_events` are evaluated to see if there are any events ready to be processed. If there is an event ready, the function immediately returns the event wrapped in `Poll::Ready()`. The `queued_events` field is a `VecDeque<Discv4Event>` where `Discv4Event` is an enum containing one of the following variants.
|
||||
|
||||
[File: crates/net/discv4/src/lib.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/discv4/src/lib.rs#L1455)
|
||||
```rust ignore
|
||||
pub enum Discv4Event {
|
||||
/// A `Ping` message was handled.
|
||||
Ping,
|
||||
/// A `Pong` message was handled.
|
||||
Pong,
|
||||
/// A `FindNode` message was handled.
|
||||
FindNode,
|
||||
/// A `Neighbours` message was handled.
|
||||
Neighbours,
|
||||
/// A `EnrRequest` message was handled.
|
||||
EnrRequest,
|
||||
/// A `EnrResponse` message was handled.
|
||||
EnrResponse,
|
||||
}
|
||||
```
|
||||
|
||||
If there is not a `Discv4Event` immediately ready, the function continues triggering self lookup, and removing any nodes that have not sent a ping request in the allowable time elapsed (`now.duration_since(enr_request.sent_at) < Discv4Service.config.ping_expiration`). Additionally, the node re-pings all nodes whose endpoint proofs (explained below) are considered expired. If the node fails to respond to the "ping" with a "pong", the node will be removed from the `KbucketsTable`.
|
||||
|
||||
To prevent traffic amplification attacks (ie. DNS attacks), implementations must verify that the sender of a query participates in the discovery protocol. The sender of a packet is considered verified if it has sent a valid Pong response with matching ping hash within the last 12 hours. This is called the ["endpoint proof"](https://github.com/ethereum/devp2p/blob/master/discv4.md#endpoint-proof).
|
||||
|
||||
Next, the Discv4Service handles all incoming `Discv4Command`s until there are no more commands to be processed. Following this, All `IngressEvent`s are handled, which represent all incoming datagrams related to the discv4 protocol. After all events are handled, the node pings to active nodes in it's network. This process is repeated until all of the `Discv4Event`s in `queued_events` are processed.
|
||||
|
||||
In Reth, once a new `NetworkState` is initialized as the node starts up and a new task is spawned to handle the network, the `poll()` function is used to advance the state of the network.
|
||||
|
||||
[File: crates/net/network/src/state.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/network/src/state.rs#L377)
|
||||
```rust ignore
|
||||
impl<C> NetworkState<C>
|
||||
where
|
||||
C: BlockProvider,
|
||||
{
|
||||
/// Advances the state
|
||||
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction> {
|
||||
loop {
|
||||
//--snip--
|
||||
while let Poll::Ready(discovery) = self.discovery.poll(cx) {
|
||||
self.on_discovery_event(discovery);
|
||||
}
|
||||
//--snip--
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Every time that the network is polled, the `Discovery::poll()` is also called to handle all `DiscoveryEvent`s ready to be processed. There are two types of `DiscoveryEvent`s, `DiscoveryEvent::Discovered` and `DiscoveryEvent::EnrForkId`. If a new node is discovered, the new peer is added to the node's peers. If an ENR fork Id is received, the event is pushed to a queue of messages that will later be handled by the `NetworkState`.
|
||||
|
||||
Lets recap everything that we covered. The `discv4` crate enables the node to discover new peers across the network. When the node is started, a `NetworkManager` is created which initializes a new `Discovery` type, which initializes the `Discv4Service`. When the `Discv4Service` is polled, all `Discv4Command`s, `IngressEvent`s and `DiscV4Event`s are handled until the `queued_events` are empty. This process repeats every time the `NetworkState` is polled to allow the node discover and communicate with new peers across the network.
|
||||
375
docs/crates/eth-wire.md
Normal file
375
docs/crates/eth-wire.md
Normal file
@ -0,0 +1,375 @@
|
||||
# eth-wire
|
||||
|
||||
The `eth-wire` crate provides abstractions over the [RLPx](https://github.com/ethereum/devp2p/blob/master/rlpx.md) and
|
||||
[Eth wire](https://github.com/ethereum/devp2p/blob/master/caps/eth.md) protocols.
|
||||
|
||||
This crate can be thought of as having 2 components:
|
||||
|
||||
1. Data structures which serialize and deserialize the eth protcol messages into Rust compatible types.
|
||||
2. Abstractions over Tokio Streams which operate on these types.
|
||||
|
||||
(Note that ECIES is implemented in a seperate `reth-ecies` crate.)
|
||||
## Types
|
||||
The most basic Eth-wire type is an `ProtocolMessage`. It describes all messages that reth can send/receive.
|
||||
|
||||
[File: crates/net/eth-wire/src/types/message.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/types/message.rs)
|
||||
```rust, ignore
|
||||
/// An `eth` protocol message, containing a message ID and payload.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ProtocolMessage {
|
||||
pub message_type: EthMessageID,
|
||||
pub message: EthMessage,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum EthMessage {
|
||||
Status(Status),
|
||||
NewBlockHashes(NewBlockHashes),
|
||||
Transactions(Transactions),
|
||||
NewPooledTransactionHashes(NewPooledTransactionHashes),
|
||||
GetBlockHeaders(RequestPair<GetBlockHeaders>),
|
||||
// ...
|
||||
GetReceipts(RequestPair<GetReceipts>),
|
||||
Receipts(RequestPair<Receipts>),
|
||||
}
|
||||
|
||||
/// Represents message IDs for eth protocol messages.
|
||||
#[repr(u8)]
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum EthMessageID {
|
||||
Status = 0x00,
|
||||
NewBlockHashes = 0x01,
|
||||
Transactions = 0x02,
|
||||
// ...
|
||||
NodeData = 0x0e,
|
||||
GetReceipts = 0x0f,
|
||||
Receipts = 0x10,
|
||||
}
|
||||
|
||||
```
|
||||
Messages can either be broadcast to the network, or can be a request/response message to a single peer. This 2nd type of message is
|
||||
described using a `RequestPair` struct, which is simply a concatenation of the underlying message with a request id.
|
||||
|
||||
[File: crates/net/eth-wire/src/types/message.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/types/message.rs)
|
||||
```rust, ignore
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RequestPair<T> {
|
||||
pub request_id: u64,
|
||||
pub message: T,
|
||||
}
|
||||
|
||||
```
|
||||
Every `Ethmessage` has a correspoding rust struct which implements the `Encodable` and `Decodable` traits.
|
||||
These traits are defined as follows:
|
||||
|
||||
[Crate: crates/common/rlp](https://github.com/paradigmxyz/reth/blob/main/crates/common/rlp)
|
||||
```rust, ignore
|
||||
pub trait Decodable: Sized {
|
||||
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError>;
|
||||
}
|
||||
pub trait Encodable {
|
||||
fn encode(&self, out: &mut dyn BufMut);
|
||||
fn length(&self) -> usize;
|
||||
}
|
||||
```
|
||||
These traits describe how the `Ethmessage` should be serialized/deserialized into raw bytes using the RLP format.
|
||||
In reth all [RLP](https://ethereum.org/en/developers/docs/data-structures-and-encoding/rlp/) encode/decode operations are handled by the `common/rlp` and `common/rlp-derive` crates.
|
||||
|
||||
Note that the `ProtocolMessage` itself implements these traits, so any stream of bytes can be converted into it by calling `ProtocolMessage::decode()` and vice versa with `ProtocolMessage::encode()`. The message type is determined by the first byte of the byte stream.
|
||||
|
||||
### Example: The Transactions message
|
||||
Let's understand how an `EthMessage` is implemented by taking a look at the `Transactions` Message. The eth specification describes a Transaction message as a list of RLP encoded transactions:
|
||||
|
||||
[File: ethereum/devp2p/caps/eth.md](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#transactions-0x02)
|
||||
```
|
||||
Transactions (0x02)
|
||||
[tx₁, tx₂, ...]
|
||||
|
||||
Specify transactions that the peer should make sure is included on its transaction queue.
|
||||
The items in the list are transactions in the format described in the main Ethereum specification.
|
||||
...
|
||||
|
||||
```
|
||||
|
||||
In reth, this is represented as:
|
||||
|
||||
[File: crates/net/eth-wire/src/types/broadcast.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/types/broadcast.rs)
|
||||
```rust,ignore
|
||||
pub struct Transactions(
|
||||
/// New transactions for the peer to include in its mempool.
|
||||
pub Vec<TransactionSigned>,
|
||||
);
|
||||
```
|
||||
|
||||
And the corresponding trait implementations are present in the primitives crate.
|
||||
|
||||
[File: crates/primitives/src/transaction/mod.rs](https://github.com/paradigmxyz/reth/blob/main/crates/primitives/src/transaction/mod.rs)
|
||||
```rust, ignore
|
||||
#[main_codec]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default)]
|
||||
pub struct TransactionSigned {
|
||||
pub hash: TxHash,
|
||||
pub signature: Signature,
|
||||
#[deref]
|
||||
#[as_ref]
|
||||
pub transaction: Transaction,
|
||||
}
|
||||
|
||||
impl Encodable for TransactionSigned {
|
||||
fn encode(&self, out: &mut dyn bytes::BufMut) {
|
||||
self.encode_inner(out, true);
|
||||
}
|
||||
|
||||
fn length(&self) -> usize {
|
||||
let len = self.payload_len();
|
||||
len + length_of_length(len)
|
||||
}
|
||||
}
|
||||
|
||||
impl Decodable for TransactionSigned {
|
||||
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError> {
|
||||
// Implementation omitted for brevity
|
||||
//...
|
||||
}
|
||||
|
||||
```
|
||||
Now that we know how the types work, let's take a look at how these are utilized in the network.
|
||||
|
||||
## P2PStream
|
||||
The lowest level stream to communicate with other peers is the P2P stream. It takes an underlying Tokio stream and does the following:
|
||||
|
||||
- Tracks and Manages Ping and pong messages and sends them when needed.
|
||||
- Keeps track of the SharedCapabilities between the reth node and its peers.
|
||||
- Receives bytes from peers, decompresses and forwards them to its parent stream.
|
||||
- Receives bytes from its parent stream, compresses them and sends it to peers.
|
||||
|
||||
Decompression/Compression of bytes is done with snappy algorithm ([EIP 706](https://eips.ethereum.org/EIPS/eip-706))
|
||||
using the external `snap` crate.
|
||||
|
||||
[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/p2pstream.rs)
|
||||
```rust,ignore
|
||||
#[pin_project]
|
||||
pub struct P2PStream<S> {
|
||||
#[pin]
|
||||
inner: S,
|
||||
encoder: snap::raw::Encoder,
|
||||
decoder: snap::raw::Decoder,
|
||||
pinger: Pinger,
|
||||
shared_capability: SharedCapability,
|
||||
outgoing_messages: VecDeque<Bytes>,
|
||||
disconnecting: bool,
|
||||
}
|
||||
```
|
||||
### Pinger
|
||||
To manage pinging, an instance of the `Pinger` struct is used. This is a state machine which keeps track of how many pings
|
||||
we have sent/received and the timeouts associated with them.
|
||||
|
||||
[File: crates/net/eth-wire/src/pinger.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/pinger.rs)
|
||||
```rust,ignore
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Pinger {
|
||||
/// The timer used for the next ping.
|
||||
ping_interval: Interval,
|
||||
/// The timer used for the next ping.
|
||||
timeout_timer: Pin<Box<Sleep>>,
|
||||
timeout: Duration,
|
||||
state: PingState,
|
||||
}
|
||||
|
||||
/// This represents the possible states of the pinger.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(crate) enum PingState {
|
||||
/// There are no pings in flight, or all pings have been responded to.
|
||||
Ready,
|
||||
/// We have sent a ping and are waiting for a pong, but the peer has missed n pongs.
|
||||
WaitingForPong,
|
||||
/// The peer has failed to respond to a ping.
|
||||
TimedOut,
|
||||
}
|
||||
```
|
||||
|
||||
State transitions are then implemented like a future, with the `poll_ping` function advancing the state of the pinger.
|
||||
|
||||
[File: crates/net/eth-wire/src/pinger.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/pinger.rs)
|
||||
```rust, ignore
|
||||
pub(crate) fn poll_ping(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<PingerEvent, PingerError>> {
|
||||
match self.state() {
|
||||
PingState::Ready => {
|
||||
if self.ping_interval.poll_tick(cx).is_ready() {
|
||||
self.timeout_timer.as_mut().reset(Instant::now() + self.timeout);
|
||||
self.state = PingState::WaitingForPong;
|
||||
return Poll::Ready(Ok(PingerEvent::Ping))
|
||||
}
|
||||
}
|
||||
PingState::WaitingForPong => {
|
||||
if self.timeout_timer.is_elapsed() {
|
||||
self.state = PingState::TimedOut;
|
||||
return Poll::Ready(Ok(PingerEvent::Timeout))
|
||||
}
|
||||
}
|
||||
PingState::TimedOut => {
|
||||
return Poll::Pending
|
||||
}
|
||||
};
|
||||
Poll::Pending
|
||||
```
|
||||
|
||||
### Sending and receiving data
|
||||
To send and recieve data, the P2PStream itself is a future which implemenents the `Stream` and `Sink` traits from the `futures` crate.
|
||||
|
||||
For the `Stream` trait, the `inner` stream is polled, decompressed and returned. Most of the code is just
|
||||
error handling and is omitted here for clarity.
|
||||
|
||||
[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/p2pstream.rs)
|
||||
```rust,ignore
|
||||
|
||||
impl<S> Stream for P2PStream<S> {
|
||||
type Item = Result<BytesMut, P2PStreamError>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
while let Poll::Ready(res) = this.inner.poll_next_unpin(cx) {
|
||||
let bytes = match res {
|
||||
Some(Ok(bytes)) => bytes,
|
||||
Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
|
||||
None => return Poll::Ready(None),
|
||||
};
|
||||
let decompressed_len = snap::raw::decompress_len(&bytes[1..])?;
|
||||
let mut decompress_buf = BytesMut::zeroed(decompressed_len + 1);
|
||||
this.decoder.decompress(&bytes[1..], &mut decompress_buf[1..])?;
|
||||
// ... Omitted Error handling
|
||||
decompress_buf[0] = bytes[0] - this.shared_capability.offset();
|
||||
return Poll::Ready(Some(Ok(decompress_buf)))
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Similarly, for the `Sink` trait, we do the reverse, compressing and sending data out to the `inner` stream.
|
||||
The important functions in this trait are shown below.
|
||||
|
||||
[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/p2pstream.rs)
|
||||
```rust, ignore
|
||||
impl<S> Sink<Bytes> for P2PStream<S> {
|
||||
fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
|
||||
let this = self.project();
|
||||
let mut compressed = BytesMut::zeroed(1 + snap::raw::max_compress_len(item.len() - 1));
|
||||
let compressed_size = this.encoder.compress(&item[1..], &mut compressed[1..])?;
|
||||
compressed.truncate(compressed_size + 1);
|
||||
compressed[0] = item[0] + this.shared_capability.offset();
|
||||
this.outgoing_messages.push_back(compressed.freeze());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let mut this = self.project();
|
||||
loop {
|
||||
match ready!(this.inner.as_mut().poll_flush(cx)) {
|
||||
Err(err) => return Poll::Ready(Err(err.into())),
|
||||
Ok(()) => {
|
||||
if let Some(message) = this.outgoing_messages.pop_front() {
|
||||
if let Err(err) = this.inner.as_mut().start_send(message) {
|
||||
return Poll::Ready(Err(err.into()))
|
||||
}
|
||||
} else {
|
||||
return Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## EthStream
|
||||
The EthStream is very simple, it does not keep track of any state, it simply wraps the P2Pstream.
|
||||
|
||||
[File: crates/net/eth-wire/src/ethstream.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/ethstream.rs)
|
||||
```rust,ignore
|
||||
#[pin_project]
|
||||
pub struct EthStream<S> {
|
||||
#[pin]
|
||||
inner: S,
|
||||
}
|
||||
```
|
||||
EthStream's only job is to perform the RLP decoding/encoding, using the `ProtocolMessage::decode()` and `ProtocolMessage::encode()`
|
||||
functions we looked at earlier.
|
||||
|
||||
[File: crates/net/eth-wire/src/ethstream.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/ethstream.rs)
|
||||
```rust,ignore
|
||||
impl<S, E> Stream for EthStream<S> {
|
||||
// ...
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
let bytes = ready!(this.inner.poll_next(cx)).unwrap();
|
||||
// ...
|
||||
let msg = match ProtocolMessage::decode(&mut bytes.as_ref()) {
|
||||
Ok(m) => m,
|
||||
Err(err) => {
|
||||
return Poll::Ready(Some(Err(err.into())))
|
||||
}
|
||||
};
|
||||
Poll::Ready(Some(Ok(msg.message)))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, E> Sink<EthMessage> for EthStream<S> {
|
||||
// ...
|
||||
fn start_send(self: Pin<&mut Self>, item: EthMessage) -> Result<(), Self::Error> {
|
||||
// ...
|
||||
let mut bytes = BytesMut::new();
|
||||
ProtocolMessage::from(item).encode(&mut bytes);
|
||||
|
||||
let bytes = bytes.freeze();
|
||||
self.project().inner.start_send(bytes)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.project().inner.poll_flush(cx).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
```
|
||||
## Unauthed streams
|
||||
For a session to be established, peers in the Ethereum network must first exchange a `Hello` message in the RLPx layer and then a
|
||||
`Status` message in the eth-wire layer.
|
||||
|
||||
To perform these, reth has special `Unauthed` versions of streams described above.
|
||||
|
||||
The `UnauthedP2Pstream` does the `Hello` handshake and returns a `P2PStream`.
|
||||
|
||||
[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/p2pstream.rs)
|
||||
```rust, ignore
|
||||
#[pin_project]
|
||||
pub struct UnauthedP2PStream<S> {
|
||||
#[pin]
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S> UnauthedP2PStream<S> {
|
||||
// ...
|
||||
pub async fn handshake(mut self, hello: HelloMessage) -> Result<(P2PStream<S>, HelloMessage), Error> {
|
||||
let mut raw_hello_bytes = BytesMut::new();
|
||||
P2PMessage::Hello(hello.clone()).encode(&mut raw_hello_bytes);
|
||||
|
||||
self.inner.send(raw_hello_bytes.into()).await?;
|
||||
let first_message_bytes = tokio::time::timeout(HANDSHAKE_TIMEOUT, self.inner.next()).await;
|
||||
|
||||
let their_hello = match P2PMessage::decode(&mut &first_message_bytes[..]) {
|
||||
Ok(P2PMessage::Hello(hello)) => Ok(hello),
|
||||
// ...
|
||||
}
|
||||
}?;
|
||||
let stream = P2PStream::new(self.inner, capability);
|
||||
|
||||
Ok((stream, their_hello))
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
Similary, UnauthedEthStream does the `Status` handshake and returns an `EthStream`. The code is [here](https://github.com/paradigmxyz/reth/blob/main/crates/net/eth-wire/src/ethstream.rs)
|
||||
|
||||
|
||||
1120
docs/crates/network.md
Normal file
1120
docs/crates/network.md
Normal file
File diff suppressed because it is too large
Load Diff
165
docs/crates/stages.md
Normal file
165
docs/crates/stages.md
Normal file
@ -0,0 +1,165 @@
|
||||
# Stages
|
||||
|
||||
The `stages` lib plays a central role in syncing the node, maintaining state, updating the database and more. The stages involved in the Reth pipeline are the `HeaderStage`, `BodyStage`, `SenderRecoveryStage`, and `ExecutionStage` (note that this list is non-exhaustive, and more pipeline stages will be added in the near future). Each of these stages are queued up and stored within the Reth pipeline.
|
||||
|
||||
[File: crates/stages/src/pipeline.rs](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/pipeline.rs)
|
||||
```rust,ignore
|
||||
pub struct Pipeline<DB: Database> {
|
||||
stages: Vec<QueuedStage<DB>>,
|
||||
max_block: Option<BlockNumber>,
|
||||
events_sender: MaybeSender<PipelineEvent>,
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
When the node is first started, a new `Pipeline` is initialized and all of the stages are added into `Pipeline.stages`. Then, the `Pipeline::run` function is called, which starts the pipeline, executing all of the stages continuously in an infinite loop. This process syncs the chain, keeping everything up to date with the chain tip.
|
||||
|
||||
Each stage within the pipeline implements the `Stage` trait which provides function interfaces to get the stage id, execute the stage and unwind the changes to the database if there was an issue during the stage execution.
|
||||
|
||||
[File: crates/stages/src/stage.rs](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/stage.rs)
|
||||
```rust,ignore
|
||||
pub trait Stage<DB: Database>: Send + Sync {
|
||||
/// Get the ID of the stage.
|
||||
///
|
||||
/// Stage IDs must be unique.
|
||||
fn id(&self) -> StageId;
|
||||
|
||||
/// Execute the stage.
|
||||
async fn execute(
|
||||
&mut self,
|
||||
tx: &mut Transaction<'_, DB>,
|
||||
input: ExecInput,
|
||||
) -> Result<ExecOutput, StageError>;
|
||||
|
||||
/// Unwind the stage.
|
||||
async fn unwind(
|
||||
&mut self,
|
||||
tx: &mut Transaction<'_, DB>,
|
||||
input: UnwindInput,
|
||||
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>;
|
||||
}
|
||||
```
|
||||
|
||||
To get a better idea of what is happening at each part of the pipeline, lets walk through what is going on under the hood within the `execute()` function at each stage, starting with `HeaderStage`.
|
||||
|
||||
<br>
|
||||
|
||||
## HeaderStage
|
||||
|
||||
<!-- TODO: Cross-link to eth/65 chapter when it's written -->
|
||||
The `HeaderStage` is responsible for syncing the block headers, validating the header integrity and writing the headers to the database. When the `execute()` function is called, the local head of the chain is updated to the most recent block height previously executed by the stage. At this point, the node status is also updated with that block's height, hash and total difficulty. These values are used during any new eth/65 handshakes. After updating the head, a stream is established with other peers in the network to sync the missing chain headers between the most recent state stored in the database and the chain tip. The `HeaderStage` contains a `downloader` attribute, which is a type that implements the `HeaderDownloader` trait. The `stream()` method from this trait is used to fetch headers from the network.
|
||||
|
||||
[File: crates/interfaces/src/p2p/headers/downloader.rs](https://github.com/paradigmxyz/reth/blob/main/crates/interfaces/src/p2p/headers/downloader.rs)
|
||||
```rust,ignore
|
||||
pub trait HeaderDownloader: Downloader {
|
||||
/// Stream the headers
|
||||
fn stream(&self, head: SealedHeader, tip: H256) -> DownloadStream<'_, SealedHeader>;
|
||||
|
||||
/// Validate whether the header is valid in relation to it's parent
|
||||
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
|
||||
validate_header_download(self.consensus(), header, parent)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `HeaderStage` relies on the downloader stream to return the headers in descending order starting from the chain tip down to the latest block in the database. While other stages in the `Pipeline` start from the most recent block in the database up to the chain tip, the `HeaderStage` works in reverse to avoid [long-range attacks](https://messari.io/report/long-range-attack). When a node downloads headers in ascending order, it will not know if it is being subjected to a long-range attack until it reaches the most recent blocks. To combat this, the `HeaderStage` starts by getting the chain tip from the Consensus Layer, verifies the tip, and then walks backwards by the parent hash. Each value yielded from the stream is a `SealedHeader`.
|
||||
|
||||
[File: crates/primitives/src/header.rs](https://github.com/paradigmxyz/reth/blob/main/crates/primitives/src/header.rs)
|
||||
```rust,ignore
|
||||
pub struct SealedHeader {
|
||||
/// Locked Header fields.
|
||||
header: Header,
|
||||
/// Locked Header hash.
|
||||
hash: BlockHash,
|
||||
}
|
||||
```
|
||||
|
||||
Each `SealedHeader` is then validated to ensure that it has the proper parent. Note that this is only a basic response validation, and the `HeaderDownloader` uses the `validate` method during the `stream`, so that each header is validated according to the consensus specification before the header is yielded from the stream. After this, each header is then written to the database. If a header is not valid or the stream encounters any other error, the error is propagated up through the stage execution, the changes to the database are unwound and the stage is resumed from the most recent valid state.
|
||||
|
||||
This process continues until all of the headers have been downloaded and and written to the database. Finally, the total difficulty of the chain's head is updated and the function returns `Ok(ExecOutput { stage_progress: current_progress, reached_tip: true, done: true })`, signaling that the header sync has completed successfully.
|
||||
|
||||
<br>
|
||||
|
||||
## BodyStage
|
||||
|
||||
Once the `HeaderStage` completes successfully, the `BodyStage` will start execution. The body stage downloads block bodies for all of the new block headers that were stored locally in the database. The `BodyStage` first determines which block bodies to download by checking if the block body has an ommers hash and transaction root.
|
||||
|
||||
An ommers hash is the Keccak 256-bit hash of the ommers list portion of the block. If you are unfamiliar with ommers blocks, you can [click here to learn more](https://ethereum.org/en/glossary/#ommer). Note that while ommers blocks were important for new blocks created during Ethereum's proof of work chain, Ethereum's proof of stake chain selects exactly one block proposer at a time, causing ommers blocks not to be needed in post-merge Ethereum.
|
||||
|
||||
The transactions root is a value that is calculated based on the transactions included in the block. To derive the transactions root, a [merkle tree](https://blog.ethereum.org/2015/11/15/merkling-in-ethereum) is created from the block's transactions list. The transactions root is then derived by taking the Keccak 256-bit hash of the root node of the merkle tree.
|
||||
|
||||
When the `BodyStage` is looking at the headers to determine which block to download, it will skip the blocks where the `header.ommers_hash` and the `header.transaction_root` are empty, denoting that the block is empty as well.
|
||||
|
||||
Once the `BodyStage` determines which block bodies to fetch, a new `bodies_stream` is created which downloads all of the bodies from the `starting_block`, up until the `target_block` specified. Each time the `bodies_stream` yields a value, a `SealedBlock` is created using the block header, the ommers hash and the newly downloaded block body.
|
||||
|
||||
[File: crates/primitives/src/block.rs](https://github.com/paradigmxyz/reth/blob/main/crates/primitives/src/block.rs)
|
||||
```rust,ignore
|
||||
pub struct SealedBlock {
|
||||
/// Locked block header.
|
||||
pub header: SealedHeader,
|
||||
/// Transactions with signatures.
|
||||
pub body: Vec<TransactionSigned>,
|
||||
/// Ommer/uncle headers
|
||||
pub ommers: Vec<SealedHeader>,
|
||||
}
|
||||
```
|
||||
|
||||
The new block is then pre-validated, checking that the ommers hash and transactions root in the block header are the same in the block body. Following a successful pre-validation, the `BodyStage` loops through each transaction in the `block.body`, adding the transaction to the database. This process is repeated for every downloaded block body, with the `BodyStage` returning `Ok(ExecOutput { stage_progress: highest_block, reached_tip: true, done })` signaling it successfully completed.
|
||||
|
||||
<br>
|
||||
|
||||
## SenderRecoveryStage
|
||||
|
||||
Following a successful `BodyStage`, the `SenderRecoveryStage` starts to execute. The `SenderRecoveryStage` is responsible for recovering the transaction sender for each of the newly added transactions to the database. At the beginning of the execution function, all of the transactions are first retrieved from the database. Then the `SenderRecoveryStage` goes through each transaction and recovers the signer from the transaction signature and hash. The transaction hash is derived by taking the Keccak 256-bit hash of the RLP encoded transaction bytes. This hash is then passed into the `recover_signer` function.
|
||||
|
||||
[File: crates/primitives/src/transaction/signature.rs](https://github.com/paradigmxyz/reth/blob/main/crates/primitives/src/transaction/signature.rs)
|
||||
```rust,ignore
|
||||
pub(crate) fn recover_signer(&self, hash: H256) -> Option<Address> {
|
||||
let mut sig: [u8; 65] = [0; 65];
|
||||
|
||||
self.r.to_big_endian(&mut sig[0..32]);
|
||||
self.s.to_big_endian(&mut sig[32..64]);
|
||||
sig[64] = self.odd_y_parity as u8;
|
||||
|
||||
secp256k1::recover(&sig, hash.as_fixed_bytes()).ok()
|
||||
}
|
||||
```
|
||||
|
||||
In an [ECDSA (Elliptic Curve Digital Signature Algorithm) signature](https://wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm), the "r", "s", and "v" values are three pieces of data that are used to mathematically verify the authenticity of a digital signature. ECDSA is a widely used algorithm for generating and verifying digital signatures, and it is often used in cryptocurrencies like Ethereum.
|
||||
|
||||
The "r" is the x-coordinate of a point on the elliptic curve that is calculated as part of the signature process. The "s" is the s-value that is calculated during the signature process. It is derived from the private key and the message being signed. Lastly, the "v" is the "recovery value" that is used to recover the public key from the signature, which is derived from the signature and the message that was signed. Together, the "r", "s", and "v" values make up an ECDSA signature, and they are used to verify the authenticity of the signed transaction.
|
||||
|
||||
Once the transaction signer has been recovered, the signer is then added to the database. This process is repeated for every transaction that was retrieved, and similarly to previous stages, `Ok(ExecOutput { stage_progress: max_block_num, done: true, reached_tip: true })` is returned to signal a successful completion of the stage.
|
||||
|
||||
<br>
|
||||
|
||||
## ExecutionStage
|
||||
|
||||
Finally, after all headers, bodies and senders are added to the database, the `ExecutionStage` starts to execute. This stage is responsible for executing all of the transactions and updating the state stored in the database. For every new block header added to the database, the corresponding transactions have their signers attached to them and `reth_executor::executor::execute_and_verify_receipt()` is called, pushing the state changes resulting from the execution to a `Vec`.
|
||||
|
||||
[File: crates/stages/src/stages/execution.rs](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/stages/execution.rs)
|
||||
```rust,ignore
|
||||
reth_executor::executor::execute_and_verify_receipt(
|
||||
header,
|
||||
&recovered_transactions,
|
||||
ommers,
|
||||
&self.config,
|
||||
state_provider,
|
||||
)
|
||||
```
|
||||
|
||||
After all headers and their corresponding transactions have been executed, all of the resulting state changes are applied to the database, updating account balances, account bytecode and other state changes. After applying all of the execution state changes, if there was a block reward, it is applied to the validator's account.
|
||||
|
||||
At the end of the `execute()` function, a familiar value is returned, `Ok(ExecOutput { done: is_done, reached_tip: true, stage_progress: last_block })` signaling a successful completion of the `ExecutionStage`.
|
||||
|
||||
<br>
|
||||
|
||||
# Next Chapter
|
||||
|
||||
Now that we have covered all of the stages that are currently included in the `Pipeline`, you know how the Reth client stays synced with the chain tip and updates the database with all of the new headers, bodies, senders and state changes. While this chapter provides an overview on how the pipeline stages work, the following chapters will dive deeper into the database, the networking stack and other exciting corners of the Reth codebase. Feel free to check out any parts of the codebase mentioned in this chapter, and when you are ready, the next chapter will dive into the `database`.
|
||||
|
||||
[Next Chapter]()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user