mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Moves code walkthrough book chapters to docs (#629)
* replaced template blocks with code blocks in stages chapter * replaced template blocks with code blocks in network chapter * moved book sections to docs * fix indentation in recover_signer codeblock * remove unnecessary TODO comment in network.md
This commit is contained in:
@ -105,7 +105,6 @@ impl Command {
|
||||
let genesis_hash = init_genesis(db.clone(), self.chain.genesis.clone())?;
|
||||
|
||||
info!("Connecting to p2p");
|
||||
// ANCHOR: snippet-execute
|
||||
let network = start_network(network_config(db.clone(), chain_id, genesis_hash)).await?;
|
||||
|
||||
// TODO: Are most of these Arcs unnecessary? For example, fetch client is completely
|
||||
@ -155,7 +154,6 @@ impl Command {
|
||||
// Run pipeline
|
||||
info!("Starting pipeline");
|
||||
pipeline.run(db.clone()).await?;
|
||||
// ANCHOR_END: snippet-execute
|
||||
|
||||
info!("Finishing up");
|
||||
Ok(())
|
||||
@ -224,7 +222,6 @@ fn network_config<DB: Database>(
|
||||
}
|
||||
|
||||
/// Starts the networking stack given a [NetworkConfig] and returns a handle to the network.
|
||||
// ANCHOR: fn-start_network
|
||||
async fn start_network<C>(config: NetworkConfig<C>) -> Result<NetworkHandle, NetworkError>
|
||||
where
|
||||
C: BlockProvider + HeaderProvider + 'static,
|
||||
@ -238,4 +235,3 @@ where
|
||||
tokio::task::spawn(eth);
|
||||
Ok(handle)
|
||||
}
|
||||
// ANCHOR_END: fn-start_network
|
||||
|
||||
@ -17,8 +17,5 @@ The book is continuously rendered [here](https://paradigmxyz.github.io/reth/)!
|
||||
To get started with Reth, install, configure and sync your node.
|
||||
* To install and build reth, you can use the following [installation instruction](./installation.md).
|
||||
|
||||
**[A Tour Of Reth]()**
|
||||
|
||||
This section will take a deep dive into the inner workings of Reth.
|
||||
|
||||
[gh-book]: https://github.com/paradigmxyz/reth/tree/main/book
|
||||
@ -7,44 +7,3 @@
|
||||
<!-- An overview of all the flags, how they work and how to configure the node -->
|
||||
- [Configuring The Node]()
|
||||
- [Running Reth]()
|
||||
|
||||
# A Tour Of Reth
|
||||
|
||||
- [Database]()
|
||||
- [codecs]()
|
||||
- [libmdbx-rs]()
|
||||
- [db](./db/README.md)
|
||||
- [Networking]()
|
||||
- [P2P](./networking/p2p/README.md)
|
||||
- [network](./networking/p2p/network/README.md)
|
||||
- [eth-wire]()
|
||||
- [discv4]()
|
||||
- [ipc]()
|
||||
- [RPC]()
|
||||
- [rpc-api]()
|
||||
- [rpc]()
|
||||
- [rpc-types]()
|
||||
- [Downloaders]()
|
||||
- [bodies-downloaders]()
|
||||
- [headers-downloaders]()
|
||||
- [Ethereum]()
|
||||
- [executor]()
|
||||
- [consensus]()
|
||||
- [transaction-pool]()
|
||||
- [Staged Sync]()
|
||||
- [stages](./stages/README.md)
|
||||
- [Primitives]()
|
||||
- [primitives]()
|
||||
- [rlp]()
|
||||
- [rlp-derive]()
|
||||
- [Misc]()
|
||||
- [interfaces]()
|
||||
- [tracing]()
|
||||
- [crate-template]()
|
||||
- [examples]()
|
||||
|
||||
|
||||
|
||||
# Design
|
||||
|
||||
- [Goals](./design/goals.md)
|
||||
|
||||
@ -1 +0,0 @@
|
||||
# network
|
||||
@ -1 +0,0 @@
|
||||
# P2P
|
||||
@ -1,327 +0,0 @@
|
||||
# Network
|
||||
|
||||
The `network` crate is responsible for managing the node's connection to the Ethereum peer-to-peer (P2P) network, enabling communication with other nodes via the [various P2P subprotocols](https://github.com/ethereum/devp2p).
|
||||
|
||||
Reth's P2P networking consists primarily of 4 ongoing tasks:
|
||||
- **Discovery**: Discovers new peers in the network
|
||||
- **Transactions**: Accepts, requests, and broadcasts mempool transactions
|
||||
- **ETH Requests**: Responds to incoming requests for headers and bodies
|
||||
- **Network Management**: Handles incoming & outgoing connections with peers, and routes requests between peers and the other tasks
|
||||
|
||||
We'll leave most of the discussion of the discovery task for the [discv4](../discv4/README.md) chapter, and will focus on the other three here.
|
||||
|
||||
Let's take a look at how the main Reth CLI (i.e., a default-configured full node) makes use of the P2P layer to explore the primary interfaces and entrypoints into the `network` crate.
|
||||
|
||||
---
|
||||
|
||||
## The Network Management Task
|
||||
|
||||
The network management task is the one primarily used in the pipeline to interact with the P2P network. Apart from managing connectivity to the node's peers, it provides a couple of interfaces for sending _outbound_ requests.
|
||||
|
||||
Let's take a look at what the provided interfaces are, how they're used in the pipeline, and take a brief glance under the hood to highlight some important structs and traits in the network management task.
|
||||
|
||||
### Use of the Network in the Node
|
||||
|
||||
The `"node"` CLI command, used to run the node itself, does the following at a high level:
|
||||
1. Initializes the DB
|
||||
2. Initializes the consensus API
|
||||
3. Writes the genesis block to the DB
|
||||
4. Initializes the network
|
||||
5. Instantiates a client for fetching data from the network
|
||||
6. Configures the pipeline by adding stages to it
|
||||
7. Runs the pipeline
|
||||
|
||||
Steps 5-6 are of interest to us as they consume items from the `network` crate:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=bin/reth/src/node/mod.rs anchor=snippet-execute}}
|
||||
|
||||
Let's begin by taking a look at the line where the network is started, with the call, unsurprisingly, to `start_network`. Sounds important, doesn't it?
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=bin/reth/src/node/mod.rs anchor=fn-start_network}}
|
||||
|
||||
At a high level, this function is responsible for starting the tasks listed at the start of this chapter.
|
||||
|
||||
It gets the handles for the network management, transactions, and ETH requests tasks downstream of the `NetworkManager::builder` method call, and spawns them.
|
||||
|
||||
The `NetworkManager::builder` constructor requires a `NetworkConfig` struct to be passed in as a parameter, which can be used as the main entrypoint for setting up the entire network layer:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/config.rs anchor=struct-NetworkConfig}}
|
||||
|
||||
The discovery task progresses as the network management task is polled, handling events regarding peer management through the `Swarm` struct which is stored as a field on the `NetworkManager`:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/swarm.rs anchor=struct-Swarm}}
|
||||
|
||||
The `Swarm` struct glues together incoming connections from peers, managing sessions with peers, and recording the network's state (e.g. number of active peers, genesis hash of the network, etc.). It emits these as `SwarmEvent`s to the `NetworkManager`, and routes commands and events between the `SessionManager` and `NetworkState` structs that it holds.
|
||||
|
||||
We'll touch more on the `NetworkManager` shortly! It's perhaps the most important struct in this crate.
|
||||
|
||||
More information about the discovery task can be found in the [discv4](../discv4/README.md) chapter.
|
||||
|
||||
The ETH requests and transactions task will be explained in their own sections, following this one.
|
||||
|
||||
The variable `network` returned from `start_network` and the variable `fetch_client` returned from `network.fetch_client` are of types `NetworkHandle` and `FetchClient`, respectively. These are the two main interfaces for interacting with the P2P network, and are currently used in the `HeaderStage` and `BodyStage`.
|
||||
|
||||
Let's walk through how each is implemented, and then apply that knowledge to understand how they are used in the pipeline. In doing so, we'll dig deeper under the hood inside the network management task to get a sense of what's going on.
|
||||
|
||||
### Interacting with the Network Management Task Using `NetworkHandle`
|
||||
|
||||
The `NetworkHandle` struct is a client for the network management task that can be shared across threads. It wraps an `Arc` around the `NetworkInner` struct, defined as follows:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/network.rs anchor=struct-NetworkInner}}
|
||||
|
||||
The field of note here is `to_manager_tx`, which is a handle that can be used to send messages in a channel to an instance of the `NetworkManager` struct.
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/manager.rs anchor=struct-NetworkManager}}
|
||||
|
||||
Now we're getting to the meat of the `network` crate! The `NetworkManager` struct represents the "Network Management" task described above. It is implemented as an endless [`Future`](https://doc.rust-lang.org/std/future/trait.Future.html) that can be thought of as a "hub" process which listens for messages from the `NetworkHandle` or the node's peers and dispatches messages to the other tasks, while keeping track of the state of the network.
|
||||
|
||||
While the `NetworkManager` is meant to be spawned as a standalone [`tokio::task`](https://docs.rs/tokio/0.2.4/tokio/task/index.html), the `NetworkHandle` can be passed around and shared, enabling access to the `NetworkManager` from anywhere by sending requests & commands through the appropriate channels.
|
||||
|
||||
#### Usage of `NetworkHandle` in the Pipeline
|
||||
|
||||
In the pipeline, the `NetworkHandle` is used to instantiate the `FetchClient` - which we'll get into next - and is used in the `HeaderStage` to update the node's ["status"](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#status-0x00) (record the the total difficulty, hash, and height of the last processed block).
|
||||
|
||||
[File: crates/stages/src/stages/headers.rs](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/stages/headers.rs)
|
||||
```rust,ignore
|
||||
async fn update_head<DB: Database>(
|
||||
&self,
|
||||
tx: &Transaction<'_, DB>,
|
||||
height: BlockNumber,
|
||||
) -> Result<(), StageError> {
|
||||
// --snip--
|
||||
self.network_handle.update_status(height, block_key.hash(), td);
|
||||
// --snip--
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
Now that we have some understanding about the internals of the network management task, let's look at a higher-level abstraction that can be used to retrieve data from other peers: the `FetchClient`.
|
||||
|
||||
### Using `FetchClient` to Get Data in the Pipeline Stages
|
||||
|
||||
The `FetchClient` struct, similar to `NetworkHandle`, can be shared across threads, and is a client for fetching data from the network. It's a fairly lightweight struct:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/fetch/client.rs anchor=struct-FetchClient}}
|
||||
|
||||
The `request_tx` field is a handle to a channel that can be used to send requests for downloading data, and the `peers_handle` field is a wrapper struct around a handle to a channel that can be used to send messages for applying manual changes to the peer set.
|
||||
|
||||
#### Instantiating the `FetchClient`
|
||||
|
||||
The fields `request_tx` and `peers_handle` are cloned off of the `StateFetcher` struct when instantiating the `FetchClient`, which is the lower-level struct responsible for managing data fetching operations over the network:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/fetch/mod.rs anchor=struct-StateFetcher}}
|
||||
|
||||
This struct itself is nested deeply within the `NetworkManager`: its `Swarm` struct (shown earlier in the chapter) contains a `NetworkState` struct that has the `StateFetcher` as a field:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/state.rs anchor=struct-NetworkState}}
|
||||
|
||||
#### Usage of `FetchClient` in the Pipeline
|
||||
|
||||
The `FetchClient` implements the `HeadersClient` and `BodiesClient` traits, defining the funcionality to get headers and block bodies from available peers.
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/fetch/client.rs anchor=trait-HeadersClient-BodiesClient}}
|
||||
|
||||
This functionality is used in the `HeaderStage` and `BodyStage`, respectively.
|
||||
|
||||
In the pipeline used by the main Reth binary, the `HeaderStage` uses a `LinearDownloader` to stream headers from the network:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/downloaders/src/headers/linear.rs anchor=struct-LinearDownloader}}
|
||||
|
||||
A `FetchClient` is passed in to the `client` field, and the `get_headers` method it implements gets used when polling the stream created by the `LinearDownloader` in the `execute` method of the `HeaderStage`.
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/downloaders/src/headers/linear.rs anchor=fn-get_or_init_fut}}
|
||||
|
||||
In the `BodyStage` configured by the main binary, a `ConcurrentDownloader` is used:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/downloaders/src/bodies/concurrent.rs anchor=struct-ConcurrentDownloader}}
|
||||
|
||||
Here, similarly, a `FetchClient` is passed in to the `client` field, and the `get_block_bodies` method it implements is used when constructing the stream created by the `ConcurrentDownloader` in the `execute` method of the `BodyStage`.
|
||||
|
||||
[File: crates/net/downloaders/src/bodies/concurrent.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/downloaders/src/bodies/concurrent.rs)
|
||||
```rust,ignore
|
||||
async fn fetch_bodies(
|
||||
&self,
|
||||
headers: Vec<&SealedHeader>,
|
||||
) -> DownloadResult<Vec<BlockResponse>> {
|
||||
// --snip--
|
||||
let (peer_id, bodies) =
|
||||
self.client.get_block_bodies(headers_with_txs_and_ommers).await?.split();
|
||||
// --snip--
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## ETH Requests Task
|
||||
|
||||
The ETH requests task serves _incoming_ requests related to blocks in the [`eth` P2P subprotocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages) from other peers.
|
||||
|
||||
Similar to the network management task, it's implemented as an endless future, but it is meant to run as a background task (on a standalone `tokio::task`) and not to be interacted with directly from the pipeline. It's represented by the following `EthRequestHandler` struct:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/eth_requests.rs anchor=struct-EthRequestHandler}}
|
||||
|
||||
The `client` field here is a client that's used to fetch data from the database, not to be confused with the `client` field on a downloader like the `LinearDownloader` discussed above, which is a `FetchClient`.
|
||||
|
||||
### Input Streams to the ETH Requests Task
|
||||
|
||||
The `incoming_requests` field is the receiver end of a channel that accepts, as you might have guessed, incoming ETH requests from peers. The sender end of this channel is stored on the `NetworkManager` struct as the `to_eth_request_handler` field.
|
||||
|
||||
As the `NetworkManager` is polled and listens for events from peers passed through the `Swarm` struct it holds, it sends any received ETH requests into the channel.
|
||||
|
||||
### The Operation of the ETH Requests Task
|
||||
|
||||
Being an endless future, the core of the ETH requests task's functionality is in its `poll` method implementation. As the `EthRequestHandler` is polled, it listens for any ETH requests coming through the channel, and handles them accordingly. At the time of writing, the ETH requests task can handle the [`GetBlockHeaders`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getblockheaders-0x03) and [`GetBlockBodies`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getblockbodies-0x05) requests.
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/eth_requests.rs anchor=fn-poll}}
|
||||
|
||||
The handling of these requests is fairly straightforward. The `GetBlockHeaders` payload is the following:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/eth-wire/src/types/blocks.rs anchor=struct-GetBlockHeaders}}
|
||||
|
||||
In handling this request, the ETH requests task attempts, starting with `start_block`, to fetch the associated header from the database, increment/decrement the block number to fetch by `skip` depending on the `direction` while checking for overflow/underflow, and checks that bounds specifying the maximum numbers of headers or bytes to send have not been breached.
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/eth_requests.rs anchor=fn-get_headers_response}}
|
||||
|
||||
The `GetBlockBodies` payload is simpler, it just contains a vector of requested block hashes:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/eth-wire/src/types/blocks.rs anchor=struct-GetBlockBodies}}
|
||||
|
||||
In handling this request, similarly, the ETH requests task attempts, for each hash in the requested order, to fetch the block body (transactions & ommers), while checking that bounds specifying the maximum numbers of bodies or bytes to send have not been breached.
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/eth_requests.rs anchor=fn-on_bodies_request}}
|
||||
|
||||
---
|
||||
|
||||
## Transactions Task
|
||||
|
||||
The transactions task listens for, requests, and propagates transactions both from the node's peers, and those that are added locally (e.g., submitted via RPC). Note that this task focuses solely on the network communication involved with Ethereum transactions, we will talk more about the structure of the transaction pool itself
|
||||
in the [transaction-pool](../../../ethereum/transaction-pool/README.md) chapter.
|
||||
|
||||
Again, like the network management and ETH requests tasks, the transactions task is implemented as an endless future that runs as a background task on a standalone `tokio::task`. It's represented by the `TransactionsManager` struct:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/transactions.rs anchor=struct-TransactionsManager}}
|
||||
|
||||
Unlike the ETH requests task, but like the network management task's `NetworkHandle`, the transactions task can also be accessed via a shareable "handle" struct, the `TransactionsHandle`:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/transactions.rs anchor=struct-TransactionsHandle}}
|
||||
|
||||
### Input Streams to the Transactions Task
|
||||
|
||||
We'll touch on most of the fields in the `TransactionsManager` as the chapter continues, but some worth noting now are the 4 streams from which inputs to the task are fed:
|
||||
- `transaction_events`: A listener for `NetworkTransactionEvent`s sent from the `NetworkManager`, which consist solely of events related to transactions emitted by the network.
|
||||
- `network_events`: A listener for `NetworkEvent`s sent from the `NetworkManager`, which consist of other "meta" events such as sessions with peers being established or closed.
|
||||
- `command_rx`: A listener for `TransactionsCommand`s sent from the `TransactionsHandle`
|
||||
- `pending`: A listener for new pending transactions added to the `TransactionPool`
|
||||
|
||||
Let's get a view into the transactions task's operation by walking through the `TransactionManager::poll` method.
|
||||
|
||||
### The Operation of the Transactions Task
|
||||
|
||||
The `poll` method lays out an order of operations for the transactions task. It begins by draining the `TransactionsManager.network_events`, `TransactionsManager.command_rx`, and `TransactionsManager.transaction_events` streams, in this order.
|
||||
Then, it checks on all the current `TransactionsManager.inflight_requests`, which are requests sent by the node to its peers for full transaction objects. After this, it checks on the status of completed `TransactionsManager.pool_imports` events, which are transactions that are being imported into the node's transaction pool. Finally, it drains the new `TransactionsManager.pending_transactions` events from the transaction pool.
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/transactions.rs anchor=fn-poll}}
|
||||
|
||||
Let's go through the handling occurring during each of these steps, in order, starting with the draining of the `TransactionsManager.network_events` stream.
|
||||
|
||||
#### Handling `NetworkEvent`s
|
||||
|
||||
The `TransactionsManager.network_events` stream is the first to have all of its events processed because it contains events concerning peer sessions opening and closing. This ensures, for example, that new peers are tracked in the `TransactionsManager` before events sent from them are processed.
|
||||
|
||||
The events received in this channel are of type `NetworkEvent`:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/manager.rs anchor=enum-NetworkEvent}}
|
||||
|
||||
They're handled with the `on_network_event` method, which responds to the two variants of the `NetworkEvent` enum in the following ways:
|
||||
|
||||
**`NetworkEvent::SessionClosed`**
|
||||
Removes the peer given by `NetworkEvent::SessionClosed.peer_id` from the `TransactionsManager.peers` map.
|
||||
|
||||
**`NetworkEvent::SessionEstablished`**
|
||||
Begins by inserting a `Peer` into `TransactionsManager.peers` by `peer_id`, which is a struct of the following form:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/transactions.rs anchor=struct-Peer}}
|
||||
|
||||
Note that the `Peer` struct contains a field `transactions`, which is an [LRU cache](https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU)) of the transactions this peer is aware of.
|
||||
|
||||
The `request_tx` field on the `Peer` is used the sender end of a channel to send requests to the session with the peer.
|
||||
|
||||
After the `Peer` is added to `TransactionsManager.peers`, the hashes of all of the transactions in the node's transaction pool are sent to the peer in a [`NewPooledTransactionHashes` message](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#newpooledtransactionhashes-0x08).
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/transactions.rs anchor=fn-on_network_event}}
|
||||
|
||||
#### Handling `TransactionsCommand`s
|
||||
|
||||
Next in the `poll` method, `TransactionsCommand`s sent through the `TransactionsManager.command_rx` stream are handled. These are the next to be handled as they are those sent manually via the `TransactionsHandle`, giving them precedence over transactions-related requests picked up from the network. The `TransactionsCommand` enum has the following form:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/transactions.rs anchor=enum-TransactionsCommand}}
|
||||
|
||||
`TransactionsCommand`s are handled by the `on_command` method. This method responds to the, at the time of writing, sole variant of the `TransactionsCommand` enum, `TransactionsCommand::PropagateHash`, with the `on_new_transactions` method, passing in an iterator consisting of the single hash contained by the variant (though this method can be called with many transaction hashes).
|
||||
|
||||
`on_new_transactions` propagates the full transaction object, with the signer attached, to a small random sample of peers using the `propagate_transactions` method. Then, it notifies all other peers of the hash of the new transaction, so that they can request the full transaction object if they don't already have it.
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/transactions.rs anchor=fn-on_new_transactions-propagate_transactions}}
|
||||
|
||||
#### Handling `NetworkTransactionEvent`s
|
||||
|
||||
After `TransactionsCommand`s, it's time to take care of transactions-related requests sent by peers in the network, so the `poll` method handles `NetworkTransactionEvent`s received through the `TransactionsManager.transaction_events` stream. `NetworkTransactionEvent` has the following form:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/transactions.rs anchor=enum-NetworkTransactionEvent}}
|
||||
|
||||
These events are handled with the `on_network_tx_event` method, which responds to the variants of the `NetworkTransactionEvent` enum in the following ways:
|
||||
|
||||
**`NetworkTransactionEvent::IncomingTransactions`**
|
||||
|
||||
This event is generated from the [`Transactions` protocol message](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#transactions-0x02), and is handled by the `import_transactions` method.
|
||||
|
||||
Here, for each transaction in the variant's `msg` field, we attempt to recover the signer, insert the transaction into LRU cache of the `Peer` identified by the variant's `peer_id` field, and add the `peer_id` to the vector of peer IDs keyed by the transaction's hash in `TransactionsManager.transactions_by_peers`. If an entry does not already exist for the transaction hash, then it begins importing the transaction object into the node's transaction pool, adding a `PoolImportFuture` to `TransactionsManager.pool_imports`. If there was an issue recovering the signer, `report_bad_message` is called for the `peer_id`, which decreases the peer's reputation.
|
||||
|
||||
To understand this a bit better, let's double back and examine what `TransactionsManager.transactions_by_peers` and `TransactionsManager.pool_imports` are used for.
|
||||
|
||||
`TransactionsManager.transactions_by_peers` is a `HashMap<TxHash, Vec<PeerId>>`, tracks which peers have sent us a transaction with the given hash. This has two uses: the first being that it prevents us from redundantly importing transactions into the transaction pool for which we've already begun this process (this check occurs in `import_transactions`), and the second being that if a transaction we receive is malformed in some way and ends up erroring when imported to the transaction pool, we can reduce the reputation score for all of the peers that sent us this transaction (this occurs in `on_bad_import`, which we'll touch on soon).
|
||||
|
||||
`TransactionsManager.pool_imports` is a set of futures representing the transactions which are currently in the process of being imported to the node's transaction pool. This process is asynchronous due to the validation of the transaction that must occur, thus we need to keep a handle on the generated future.
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/transactions.rs anchor=fn-import_transactions}}
|
||||
|
||||
**`NetworkTransactionEvent::IncomingPooledTransactionHashes`**
|
||||
|
||||
This event is generated from the [`NewPooledTransactionHashes` protocol message](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#newpooledtransactionhashes-0x08), and is handled by the `on_new_pooled_transactions` method.
|
||||
|
||||
Here, it begins by adding the transaction hashes included in the `NewPooledTransactionHashes` payload to the LRU cache for the `Peer` identified by `peer_id` in `TransactionsManager.peers`. Next, it filters the list of hashes to those that are not already present in the transaction pool, and for each such hash, requests its full transaction object from the peer by sending it a [`GetPooledTransactions` protocol message](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09) through the `Peer.request_tx` channel. If the request was successfully sent, a `GetPooledTxRequest` gets added to `TransactionsManager.inflight_requests` vector:
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/transactions.rs anchor=struct-GetPooledTxRequest}}
|
||||
|
||||
As you can see, this struct also contains a `response` channel from which the peer's response can later be polled.
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/transactions.rs anchor=fn-on_new_pooled_transactions}}
|
||||
|
||||
**`NetworkTransactionEvent::GetPooledTransactions`**
|
||||
|
||||
This event is generated from the [`GetPooledTransactions` protocol message](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09), and is handled by the `on_get_pooled_transactions` method.
|
||||
|
||||
Here, it collects _all_ the transactions in the node's transaction pool, recovers their signers, adds their hashes to the LRU cache of the requesting peer, and sends them to the peer in a [`PooledTransactions` protocol message](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#pooledtransactions-0x0a). This is sent through the `response` channel that's stored as a field of the `NetworkTransaction::GetPooledTransactions` variant itself.
|
||||
|
||||
{{#template ../../../templates/source_and_github.md path_to_root=../../../../ path=crates/net/network/src/transactions.rs anchor=fn-on_get_pooled_transactions}}
|
||||
|
||||
#### Checking on `inflight_requests`
|
||||
|
||||
Once all the network activity is handled by draining `TransactionsManager.network_events`, `TransactionsManager.command_rx`, and `TransactionsManager.transaction_events` streams, the `poll` method moves on to checking the status of all `inflight_requests`.
|
||||
|
||||
Here, for each in-flight request, `GetPooledTxRequest.response` field gets polled. If the request is still pending, it remains in the `TransactionsManager.inflight_requests` vector. If the request successfully received a `PooledTransactions` response from the peer, they get handled by the `import_transactions` method (described above). Otherwise, if there was some error in polling the response, we call `report_bad_message` (also described above) on the peer's ID.
|
||||
|
||||
#### Checking on `pool_imports`
|
||||
|
||||
When the last round of `PoolImportFuture`s has been added to `TransactionsManager.pool_imports` after handling the completed `inflight_requests`, the `poll` method continues by checking the status of the `pool_imports`.
|
||||
|
||||
It iterates over `TransactionsManager.pool_imports`, polling each one, and if it's ready (i.e., the future has resolved), it handles successful and unsuccessful import results respectively with `on_good_import` and `on_bad_import`.
|
||||
|
||||
`on_good_import`, called when the transaction was successfully imported into the transaction pool, removes the entry for the given transaction hash from `TransactionsManager.transactions_by_peers`.
|
||||
|
||||
`on_bad_import` also removes the entry for the given transaction hash from `TransactionsManager.transactions_by_peers`, but also calls `report_bad_message` for each peer in the entry, decreasing all of their reputation scores as they were propagating a transaction that could not validated.
|
||||
|
||||
#### Checking on `pending_transactions`
|
||||
|
||||
Finally, the last thing for the `poll` method to do is to drain the `TransactionsManager.pending_transactions` stream. These transactions are those that were added either via propagation from a peer, the handling of which has been laid out above, or via RPC on the node itself, and which were successfully validated and added to the transaction pool.
|
||||
|
||||
It polls `TransactionsManager.pending_transactions`, collecting each resolved transaction into a vector, and calls `on_new_transactions` with said vector. The functionality of the `on_new_transactions` method is described above in the handling of `TransactionsCommand::PropagateHash`.
|
||||
@ -12,7 +12,6 @@ use reth_primitives::{SealedHeader, H256};
|
||||
///
|
||||
/// A downloader represents a distinct strategy for submitting requests to download block headers,
|
||||
/// while a [HeadersClient] represents a client capable of fulfilling these requests.
|
||||
// ANCHOR: trait-HeaderDownloader
|
||||
#[auto_impl::auto_impl(&, Arc, Box)]
|
||||
pub trait HeaderDownloader: Downloader {
|
||||
/// Stream the headers
|
||||
@ -24,7 +23,6 @@ pub trait HeaderDownloader: Downloader {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: trait-HeaderDownloader
|
||||
|
||||
/// Validate whether the header is valid in relation to it's parent
|
||||
///
|
||||
|
||||
@ -17,7 +17,6 @@ use std::{borrow::Borrow, sync::Arc};
|
||||
/// Downloads bodies in batches.
|
||||
///
|
||||
/// All blocks in a batch are fetched at the same time.
|
||||
// ANCHOR: struct-ConcurrentDownloader
|
||||
#[derive(Debug)]
|
||||
pub struct ConcurrentDownloader<Client, Consensus> {
|
||||
/// The bodies client
|
||||
@ -31,7 +30,6 @@ pub struct ConcurrentDownloader<Client, Consensus> {
|
||||
/// The maximum number of requests to send concurrently.
|
||||
concurrency: usize,
|
||||
}
|
||||
// ANCHOR_END: struct-ConcurrentDownloader
|
||||
|
||||
impl<Client, Consensus> Downloader for ConcurrentDownloader<Client, Consensus>
|
||||
where
|
||||
|
||||
@ -21,7 +21,6 @@ use std::{
|
||||
};
|
||||
|
||||
/// Download headers in batches
|
||||
// ANCHOR: struct-LinearDownloader
|
||||
#[derive(Debug)]
|
||||
pub struct LinearDownloader<C, H> {
|
||||
/// The consensus client
|
||||
@ -33,7 +32,6 @@ pub struct LinearDownloader<C, H> {
|
||||
/// The number of retries for downloading
|
||||
pub request_retries: usize,
|
||||
}
|
||||
// ANCHOR_END: struct-LinearDownloader
|
||||
|
||||
impl<C, H> Downloader for LinearDownloader<C, H>
|
||||
where
|
||||
@ -194,7 +192,6 @@ where
|
||||
}
|
||||
|
||||
/// Get a current future or instantiate a new one
|
||||
// ANCHOR: fn-get_or_init_fut
|
||||
fn get_or_init_fut(&mut self) -> HeadersRequestFuture {
|
||||
match self.request.take() {
|
||||
None => {
|
||||
@ -215,7 +212,6 @@ where
|
||||
Some(fut) => fut,
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: fn-get_or_init_fut
|
||||
|
||||
/// Tries to fuse the future with a new request.
|
||||
///
|
||||
|
||||
@ -17,7 +17,6 @@ use serde::{Deserialize, Serialize};
|
||||
#[derive(
|
||||
Copy, Clone, Debug, PartialEq, Eq, Hash, RlpEncodable, RlpDecodable, Serialize, Deserialize,
|
||||
)]
|
||||
// ANCHOR: struct-GetBlockHeaders
|
||||
pub struct GetBlockHeaders {
|
||||
/// The block number or hash that the peer should start returning headers from.
|
||||
pub start_block: BlockHashOrNumber,
|
||||
@ -34,7 +33,6 @@ pub struct GetBlockHeaders {
|
||||
/// The direction in which the headers should be returned in.
|
||||
pub direction: HeadersDirection,
|
||||
}
|
||||
// ANCHOR_END: struct-GetBlockHeaders
|
||||
|
||||
/// The response to [`GetBlockHeaders`], containing headers if any headers were found.
|
||||
#[derive(
|
||||
@ -71,12 +69,10 @@ impl From<Vec<Header>> for BlockHeaders {
|
||||
Deserialize,
|
||||
Default,
|
||||
)]
|
||||
// ANCHOR: struct-GetBlockBodies
|
||||
pub struct GetBlockBodies(
|
||||
/// The block hashes to request bodies for.
|
||||
pub Vec<H256>,
|
||||
);
|
||||
// ANCHOR_END: struct-GetBlockBodies
|
||||
|
||||
impl From<Vec<H256>> for GetBlockBodies {
|
||||
fn from(hashes: Vec<H256>) -> Self {
|
||||
|
||||
@ -30,7 +30,6 @@ pub fn rng_secret_key() -> SecretKey {
|
||||
}
|
||||
|
||||
/// All network related initialization settings.
|
||||
// ANCHOR: struct-NetworkConfig
|
||||
pub struct NetworkConfig<C> {
|
||||
/// The client type that can interact with the chain.
|
||||
pub client: Arc<C>,
|
||||
@ -70,7 +69,6 @@ pub struct NetworkConfig<C> {
|
||||
/// Sets the hello message for the p2p handshake in RLPx
|
||||
pub hello_message: HelloMessage,
|
||||
}
|
||||
// ANCHOR_END: struct-NetworkConfig
|
||||
|
||||
// === impl NetworkConfig ===
|
||||
|
||||
|
||||
@ -46,7 +46,6 @@ const APPROX_HEADER_SIZE: usize = 500;
|
||||
/// Manages eth related requests on top of the p2p network.
|
||||
///
|
||||
/// This can be spawned to another task and is supposed to be run as background service.
|
||||
// ANCHOR: struct-EthRequestHandler
|
||||
#[must_use = "Manager does nothing unless polled."]
|
||||
pub struct EthRequestHandler<C> {
|
||||
/// The client type that can interact with the chain.
|
||||
@ -58,7 +57,6 @@ pub struct EthRequestHandler<C> {
|
||||
/// Incoming request from the [NetworkManager](crate::NetworkManager).
|
||||
incoming_requests: UnboundedReceiverStream<IncomingEthRequest>,
|
||||
}
|
||||
// ANCHOR_END: struct-EthRequestHandler
|
||||
|
||||
// === impl EthRequestHandler ===
|
||||
impl<C> EthRequestHandler<C> {
|
||||
@ -77,7 +75,6 @@ where
|
||||
C: BlockProvider + HeaderProvider,
|
||||
{
|
||||
/// Returns the list of requested heders
|
||||
// ANCHOR:fn-get_headers_response
|
||||
fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<Header> {
|
||||
let GetBlockHeaders { start_block, limit, skip, direction } = request;
|
||||
|
||||
@ -142,7 +139,6 @@ where
|
||||
|
||||
headers
|
||||
}
|
||||
// ANCHOR_END:fn-get_headers_response
|
||||
|
||||
fn on_headers_request(
|
||||
&mut self,
|
||||
@ -154,7 +150,6 @@ where
|
||||
let _ = response.send(Ok(BlockHeaders(headers)));
|
||||
}
|
||||
|
||||
// ANCHOR: fn-on_bodies_request
|
||||
fn on_bodies_request(
|
||||
&mut self,
|
||||
_peer_id: PeerId,
|
||||
@ -187,7 +182,6 @@ where
|
||||
|
||||
let _ = response.send(Ok(BlockBodies(bodies)));
|
||||
}
|
||||
// ANCHOR_END: fn-on_bodies_request
|
||||
}
|
||||
|
||||
/// An endless future.
|
||||
@ -199,7 +193,6 @@ where
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
// ANCHOR: fn-poll
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
@ -220,7 +213,6 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: fn-poll
|
||||
}
|
||||
|
||||
/// Represents a handled [`GetBlockHeaders`] requests
|
||||
|
||||
@ -15,7 +15,6 @@ use reth_primitives::{PeerId, WithPeerId, H256};
|
||||
use tokio::sync::{mpsc::UnboundedSender, oneshot};
|
||||
|
||||
/// Front-end API for fetching data from the network.
|
||||
// ANCHOR: struct-FetchClient
|
||||
#[derive(Debug)]
|
||||
pub struct FetchClient {
|
||||
/// Sender half of the request channel.
|
||||
@ -23,7 +22,6 @@ pub struct FetchClient {
|
||||
/// The handle to the peers
|
||||
pub(crate) peers_handle: PeersHandle,
|
||||
}
|
||||
// ANCHOR_END: struct-FetchClient
|
||||
|
||||
impl DownloadClient for FetchClient {
|
||||
fn report_bad_message(&self, peer_id: PeerId) {
|
||||
@ -31,7 +29,6 @@ impl DownloadClient for FetchClient {
|
||||
}
|
||||
}
|
||||
|
||||
// ANCHOR: trait-HeadersClient-BodiesClient
|
||||
#[async_trait::async_trait]
|
||||
impl HeadersClient for FetchClient {
|
||||
/// Sends a `GetBlockHeaders` request to an available peer.
|
||||
@ -50,4 +47,3 @@ impl BodiesClient for FetchClient {
|
||||
rx.await?
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: trait-HeadersClient-BodiesClient
|
||||
|
||||
@ -25,7 +25,6 @@ pub use client::FetchClient;
|
||||
/// peers and sends the response once ready.
|
||||
///
|
||||
/// This type maintains a list of connected peers that are available for requests.
|
||||
// ANCHOR: struct-StateFetcher
|
||||
pub struct StateFetcher {
|
||||
/// Currently active [`GetBlockHeaders`] requests
|
||||
inflight_headers_requests:
|
||||
@ -44,7 +43,6 @@ pub struct StateFetcher {
|
||||
/// Sender for download requests, used to detach a [`FetchClient`]
|
||||
download_requests_tx: UnboundedSender<DownloadRequest>,
|
||||
}
|
||||
// ANCHOR_END: struct-StateFetcher
|
||||
|
||||
// === impl StateSyncer ===
|
||||
|
||||
|
||||
@ -79,7 +79,6 @@ use tracing::{error, info, trace, warn};
|
||||
/// ethrequest <--> |ETH request handing| NetworkManager
|
||||
/// discovery --> |Discovered peers| NetworkManager
|
||||
/// ```
|
||||
// ANCHOR: struct-NetworkManager
|
||||
#[must_use = "The NetworkManager does nothing unless polled"]
|
||||
pub struct NetworkManager<C> {
|
||||
/// The type that manages the actual network part, which includes connections.
|
||||
@ -104,7 +103,6 @@ pub struct NetworkManager<C> {
|
||||
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
|
||||
num_active_peers: Arc<AtomicUsize>,
|
||||
}
|
||||
// ANCHOR_END: struct-NetworkManager
|
||||
|
||||
// === impl NetworkManager ===
|
||||
impl<C> NetworkManager<C> {
|
||||
@ -672,7 +670,6 @@ where
|
||||
///
|
||||
/// This includes any event types that may be relevant to tasks, for metrics, keep track of peers
|
||||
/// etc.
|
||||
// ANCHOR: enum-NetworkEvent
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum NetworkEvent {
|
||||
/// Closed the peer session.
|
||||
@ -698,7 +695,6 @@ pub enum NetworkEvent {
|
||||
/// Event emitted when a new peer is removed
|
||||
PeerRemoved(PeerId),
|
||||
}
|
||||
// ANCHOR_END: enum-NetworkEvent
|
||||
|
||||
/// Bundles all listeners for [`NetworkEvent`]s.
|
||||
#[derive(Default)]
|
||||
|
||||
@ -167,7 +167,6 @@ impl StatusUpdater for NetworkHandle {
|
||||
}
|
||||
}
|
||||
|
||||
// ANCHOR: struct-NetworkInner
|
||||
#[derive(Debug)]
|
||||
struct NetworkInner {
|
||||
/// Number of active peer sessions the node's currently handling.
|
||||
@ -183,7 +182,6 @@ struct NetworkInner {
|
||||
/// The mode of the network
|
||||
network_mode: NetworkMode,
|
||||
}
|
||||
// ANCHOR_END: struct-NetworkInner
|
||||
|
||||
/// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager).
|
||||
#[allow(missing_docs)]
|
||||
|
||||
@ -39,7 +39,6 @@ const PEER_BLOCK_CACHE_LIMIT: usize = 512;
|
||||
/// then send to the session of the peer.
|
||||
///
|
||||
/// This type is also responsible for responding for received request.
|
||||
// ANCHOR: struct-NetworkState
|
||||
pub struct NetworkState<C> {
|
||||
/// All active peers and their state.
|
||||
active_peers: HashMap<PeerId, ActivePeer>,
|
||||
@ -59,7 +58,6 @@ pub struct NetworkState<C> {
|
||||
/// then queue in the request and notify the fetcher once the result has been received.
|
||||
state_fetcher: StateFetcher,
|
||||
}
|
||||
// ANCHOR_END: struct-NetworkState
|
||||
|
||||
impl<C> NetworkState<C>
|
||||
where
|
||||
|
||||
@ -61,7 +61,6 @@ use tracing::{trace, warn};
|
||||
/// fetchRequest --> |request Headers, Bodies| StateFetch
|
||||
/// State --> |poll pending requests| StateFetch
|
||||
/// ```
|
||||
// ANCHOR: struct-Swarm
|
||||
#[must_use = "Swarm does nothing unless polled"]
|
||||
pub(crate) struct Swarm<C> {
|
||||
/// Listens for new incoming connections.
|
||||
@ -71,7 +70,6 @@ pub(crate) struct Swarm<C> {
|
||||
/// Tracks the entire state of the network and handles events received from the sessions.
|
||||
state: NetworkState<C>,
|
||||
}
|
||||
// ANCHOR_END: struct-Swarm
|
||||
|
||||
// === impl Swarm ===
|
||||
|
||||
|
||||
@ -38,12 +38,10 @@ const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10;
|
||||
pub type PoolImportFuture = Pin<Box<dyn Future<Output = PoolResult<TxHash>> + Send + 'static>>;
|
||||
|
||||
/// Api to interact with [`TransactionsManager`] task.
|
||||
// ANCHOR: struct-TransactionsHandle
|
||||
pub struct TransactionsHandle {
|
||||
/// Command channel to the [`TransactionsManager`]
|
||||
manager_tx: mpsc::UnboundedSender<TransactionsCommand>,
|
||||
}
|
||||
// ANCHOR_END: struct-TransactionsHandle
|
||||
|
||||
// === impl TransactionsHandle ===
|
||||
|
||||
@ -74,7 +72,6 @@ impl TransactionsHandle {
|
||||
///
|
||||
/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
|
||||
/// propagate new transactions over the network.
|
||||
// ANCHOR: struct-TransactionsManager
|
||||
#[must_use = "Manager does nothing unless polled."]
|
||||
pub struct TransactionsManager<Pool> {
|
||||
/// Access to the transaction pool.
|
||||
@ -105,7 +102,6 @@ pub struct TransactionsManager<Pool> {
|
||||
/// Incoming events from the [`NetworkManager`](crate::NetworkManager).
|
||||
transaction_events: UnboundedReceiverStream<NetworkTransactionEvent>,
|
||||
}
|
||||
// ANCHOR_END: struct-TransactionsManager
|
||||
|
||||
impl<Pool: TransactionPool> TransactionsManager<Pool> {
|
||||
/// Sets up a new instance.
|
||||
@ -151,7 +147,6 @@ where
|
||||
}
|
||||
|
||||
/// Request handler for an incoming request for transactions
|
||||
// ANCHOR: fn-on_get_pooled_transactions
|
||||
fn on_get_pooled_transactions(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
@ -173,7 +168,6 @@ where
|
||||
let _ = response.send(Ok(resp));
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: fn-on_get_pooled_transactions
|
||||
|
||||
/// Invoked when a new transaction is pending.
|
||||
///
|
||||
@ -186,7 +180,6 @@ where
|
||||
/// complete transaction object if it is unknown to them. The dissemination of complete
|
||||
/// transactions to a fraction of peers usually ensures that all nodes receive the transaction
|
||||
/// and won't need to request it.
|
||||
// ANCHOR: fn-on_new_transactions-propagate_transactions
|
||||
fn on_new_transactions(&mut self, hashes: impl IntoIterator<Item = TxHash>) {
|
||||
trace!(target: "net::tx", "Start propagating transactions");
|
||||
|
||||
@ -239,10 +232,8 @@ where
|
||||
|
||||
propagated
|
||||
}
|
||||
// ANCHOR_END: fn-on_new_transactions-propagate_transactions
|
||||
|
||||
/// Request handler for an incoming `NewPooledTransactionHashes`
|
||||
// ANCHOR: fn-on_new_pooled_transactions
|
||||
fn on_new_pooled_transactions(&mut self, peer_id: PeerId, msg: NewPooledTransactionHashes) {
|
||||
if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
let mut transactions = msg.0;
|
||||
@ -269,7 +260,6 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: fn-on_new_pooled_transactions
|
||||
|
||||
/// Handles dedicated transaction events related tot the `eth` protocol.
|
||||
fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) {
|
||||
@ -296,7 +286,6 @@ where
|
||||
}
|
||||
|
||||
/// Handles a received event related to common network events.
|
||||
// ANCHOR: fn-on_network_event
|
||||
fn on_network_event(&mut self, event: NetworkEvent) {
|
||||
match event {
|
||||
NetworkEvent::SessionClosed { peer_id, .. } => {
|
||||
@ -327,10 +316,8 @@ where
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: fn-on_network_event
|
||||
|
||||
/// Starts the import process for the given transactions.
|
||||
// ANCHOR: fn-import_transactions
|
||||
fn import_transactions(&mut self, peer_id: PeerId, transactions: Vec<TransactionSigned>) {
|
||||
let mut has_bad_transactions = false;
|
||||
if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
@ -371,7 +358,6 @@ where
|
||||
self.report_bad_message(peer_id);
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: fn-import_transactions
|
||||
|
||||
fn report_bad_message(&self, peer_id: PeerId) {
|
||||
self.network.reputation_change(peer_id, ReputationChangeKind::BadTransactions);
|
||||
@ -400,7 +386,6 @@ where
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
// ANCHOR: fn-poll
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
@ -464,39 +449,31 @@ where
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
// ANCHOR_END: fn-poll
|
||||
}
|
||||
|
||||
/// An inflight request for `PooledTransactions` from a peer
|
||||
#[allow(missing_docs)]
|
||||
// ANCHOR: struct-GetPooledTxRequest
|
||||
struct GetPooledTxRequest {
|
||||
peer_id: PeerId,
|
||||
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
|
||||
}
|
||||
// ANCHOR_END: struct-GetPooledTxRequest
|
||||
|
||||
/// Tracks a single peer
|
||||
// ANCHOR: struct-Peer
|
||||
struct Peer {
|
||||
/// Keeps track of transactions that we know the peer has seen.
|
||||
transactions: LruCache<H256>,
|
||||
/// A communication channel directly to the session task.
|
||||
request_tx: PeerRequestSender,
|
||||
}
|
||||
// ANCHOR_END: struct-Peer
|
||||
|
||||
/// Commands to send to the [`TransactionManager`]
|
||||
// ANCHOR: enum-TransactionsCommand
|
||||
enum TransactionsCommand {
|
||||
PropagateHash(H256),
|
||||
}
|
||||
// ANCHOR_END: enum-TransactionsCommand
|
||||
|
||||
/// All events related to transactions emitted by the network.
|
||||
#[derive(Debug)]
|
||||
#[allow(missing_docs)]
|
||||
// ANCHOR: enum-NetworkTransactionEvent
|
||||
pub enum NetworkTransactionEvent {
|
||||
/// Received list of transactions from the given peer.
|
||||
IncomingTransactions { peer_id: PeerId, msg: Transactions },
|
||||
@ -509,4 +486,3 @@ pub enum NetworkTransactionEvent {
|
||||
response: oneshot::Sender<RequestResult<PooledTransactions>>,
|
||||
},
|
||||
}
|
||||
// ANCHOR_END: enum-NetworkTransactionEvent
|
||||
|
||||
@ -22,7 +22,6 @@ impl Deref for Block {
|
||||
}
|
||||
|
||||
/// Sealed Ethereum full block.
|
||||
// ANCHOR: struct-SealedBlock
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default, RlpEncodable, RlpDecodable)]
|
||||
pub struct SealedBlock {
|
||||
/// Locked block header.
|
||||
@ -32,7 +31,6 @@ pub struct SealedBlock {
|
||||
/// Ommer/uncle headers
|
||||
pub ommers: Vec<SealedHeader>,
|
||||
}
|
||||
// ANCHOR_END: struct-SealedBlock
|
||||
|
||||
impl SealedBlock {
|
||||
/// Header hash.
|
||||
|
||||
@ -209,7 +209,6 @@ impl Decodable for Header {
|
||||
|
||||
/// A [`Header`] that is sealed at a precalculated hash, use [`SealedHeader::unseal()`] if you want
|
||||
/// to modify header.
|
||||
// ANCHOR: struct-SealedHeader
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct SealedHeader {
|
||||
/// Locked Header fields.
|
||||
@ -217,7 +216,6 @@ pub struct SealedHeader {
|
||||
/// Locked Header hash.
|
||||
hash: BlockHash,
|
||||
}
|
||||
// ANCHOR_END: struct-SealedHeader
|
||||
|
||||
impl Default for SealedHeader {
|
||||
fn default() -> Self {
|
||||
|
||||
@ -69,7 +69,6 @@ impl Signature {
|
||||
}
|
||||
|
||||
/// Recover signature from hash.
|
||||
// ANCHOR: fn-recover_signer
|
||||
pub(crate) fn recover_signer(&self, hash: H256) -> Option<Address> {
|
||||
let mut sig: [u8; 65] = [0; 65];
|
||||
|
||||
@ -81,5 +80,4 @@ impl Signature {
|
||||
// errors and we care only if recovery is passing or not.
|
||||
secp256k1::recover(&sig, hash.as_fixed_bytes()).ok()
|
||||
}
|
||||
// ANCHOR_END: fn-recover_signer
|
||||
}
|
||||
|
||||
@ -70,13 +70,11 @@ use state::*;
|
||||
/// In case of a validation error (as determined by the consensus engine) in one of the stages, the
|
||||
/// pipeline will unwind the stages in reverse order of execution. It is also possible to
|
||||
/// request an unwind manually (see [Pipeline::unwind]).
|
||||
// ANCHOR: struct-Pipeline
|
||||
pub struct Pipeline<DB: Database> {
|
||||
stages: Vec<QueuedStage<DB>>,
|
||||
max_block: Option<BlockNumber>,
|
||||
events_sender: MaybeSender<PipelineEvent>,
|
||||
}
|
||||
// ANCHOR_END: struct-Pipeline
|
||||
|
||||
impl<DB: Database> Default for Pipeline<DB> {
|
||||
fn default() -> Self {
|
||||
|
||||
@ -58,7 +58,6 @@ pub struct UnwindOutput {
|
||||
///
|
||||
/// Stages receive [`Transaction`] which manages the lifecycle of a transaction,
|
||||
/// such as when to commit / reopen a new one etc.
|
||||
// ANCHOR: trait-Stage
|
||||
#[async_trait]
|
||||
pub trait Stage<DB: Database>: Send + Sync {
|
||||
/// Get the ID of the stage.
|
||||
@ -80,4 +79,3 @@ pub trait Stage<DB: Database>: Send + Sync {
|
||||
input: UnwindInput,
|
||||
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>;
|
||||
}
|
||||
// ANCHOR_END: trait-Stage
|
||||
|
||||
@ -196,7 +196,6 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
|
||||
.stack_size(50 * 1024 * 1024)
|
||||
.spawn_scoped(scope, || {
|
||||
// execute and store output to results
|
||||
// ANCHOR: snippet-block_change_patches
|
||||
reth_executor::executor::execute_and_verify_receipt(
|
||||
header,
|
||||
&recovered_transactions,
|
||||
@ -204,7 +203,6 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
|
||||
&self.config,
|
||||
state_provider,
|
||||
)
|
||||
// ANCHOR_END: snippet-block_change_patches
|
||||
})
|
||||
.expect("Expects that thread name is not null");
|
||||
handle.join().expect("Expects for thread to not panic")
|
||||
|
||||
1093
docs/repo/crates/network.md
Normal file
1093
docs/repo/crates/network.md
Normal file
File diff suppressed because it is too large
Load Diff
@ -2,15 +2,43 @@
|
||||
|
||||
The `stages` lib plays a central role in syncing the node, maintaining state, updating the database and more. The stages involved in the Reth pipeline are the `HeaderStage`, `BodyStage`, `SenderRecoveryStage`, and `ExecutionStage` (note that this list is non-exhaustive, and more pipeline stages will be added in the near future). Each of these stages are queued up and stored within the Reth pipeline.
|
||||
|
||||
{{#template ../templates/source_and_github.md path_to_root=../../ path=crates/stages/src/pipeline.rs anchor=struct-Pipeline}}
|
||||
[File: crates/stages/src/pipeline.rs](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/pipeline.rs)
|
||||
```rust,ignore
|
||||
pub struct Pipeline<DB: Database> {
|
||||
stages: Vec<QueuedStage<DB>>,
|
||||
max_block: Option<BlockNumber>,
|
||||
events_sender: MaybeSender<PipelineEvent>,
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
When the node is first started, a new `Pipeline` is initialized and all of the stages are added into `Pipeline.stages`. Then, the `Pipeline::run` function is called, which starts the pipeline, executing all of the stages continuously in an infinite loop. This process syncs the chain, keeping everything up to date with the chain tip.
|
||||
|
||||
Each stage within the pipeline implements the `Stage` trait which provides function interfaces to get the stage id, execute the stage and unwind the changes to the database if there was an issue during the stage execution.
|
||||
|
||||
[File: crates/stages/src/stage.rs](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/stage.rs)
|
||||
```rust,ignore
|
||||
pub trait Stage<DB: Database>: Send + Sync {
|
||||
/// Get the ID of the stage.
|
||||
///
|
||||
/// Stage IDs must be unique.
|
||||
fn id(&self) -> StageId;
|
||||
|
||||
{{#template ../templates/source_and_github.md path_to_root=../../ path=crates/stages/src/stage.rs anchor=trait-Stage}}
|
||||
/// Execute the stage.
|
||||
async fn execute(
|
||||
&mut self,
|
||||
tx: &mut Transaction<'_, DB>,
|
||||
input: ExecInput,
|
||||
) -> Result<ExecOutput, StageError>;
|
||||
|
||||
/// Unwind the stage.
|
||||
async fn unwind(
|
||||
&mut self,
|
||||
tx: &mut Transaction<'_, DB>,
|
||||
input: UnwindInput,
|
||||
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>;
|
||||
}
|
||||
```
|
||||
|
||||
To get a better idea of what is happening at each part of the pipeline, lets walk through what is going on under the hood within the `execute()` function at each stage, starting with `HeaderStage`.
|
||||
|
||||
@ -21,11 +49,31 @@ To get a better idea of what is happening at each part of the pipeline, lets wal
|
||||
<!-- TODO: Cross-link to eth/65 chapter when it's written -->
|
||||
The `HeaderStage` is responsible for syncing the block headers, validating the header integrity and writing the headers to the database. When the `execute()` function is called, the local head of the chain is updated to the most recent block height previously executed by the stage. At this point, the node status is also updated with that block's height, hash and total difficulty. These values are used during any new eth/65 handshakes. After updating the head, a stream is established with other peers in the network to sync the missing chain headers between the most recent state stored in the database and the chain tip. The `HeaderStage` contains a `downloader` attribute, which is a type that implements the `HeaderDownloader` trait. The `stream()` method from this trait is used to fetch headers from the network.
|
||||
|
||||
{{#template ../templates/source_and_github.md path_to_root=../../ path=crates/interfaces/src/p2p/headers/downloader.rs anchor=trait-HeaderDownloader}}
|
||||
[File: crates/interfaces/src/p2p/headers/downloader.rs](https://github.com/paradigmxyz/reth/blob/main/crates/interfaces/src/p2p/headers/downloader.rs)
|
||||
```rust,ignore
|
||||
pub trait HeaderDownloader: Downloader {
|
||||
/// Stream the headers
|
||||
fn stream(&self, head: SealedHeader, tip: H256) -> DownloadStream<'_, SealedHeader>;
|
||||
|
||||
/// Validate whether the header is valid in relation to it's parent
|
||||
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
|
||||
validate_header_download(self.consensus(), header, parent)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `HeaderStage` relies on the downloader stream to return the headers in descending order starting from the chain tip down to the latest block in the database. While other stages in the `Pipeline` start from the most recent block in the database up to the chain tip, the `HeaderStage` works in reverse to avoid [long-range attacks](https://messari.io/report/long-range-attack). When a node downloads headers in ascending order, it will not know if it is being subjected to a long-range attack until it reaches the most recent blocks. To combat this, the `HeaderStage` starts by getting the chain tip from the Consensus Layer, verifies the tip, and then walks backwards by the parent hash. Each value yielded from the stream is a `SealedHeader`.
|
||||
|
||||
{{#template ../templates/source_and_github.md path_to_root=../../ path=crates/primitives/src/header.rs anchor=struct-SealedHeader}}
|
||||
[File: crates/primitives/src/header.rs](https://github.com/paradigmxyz/reth/blob/main/crates/primitives/src/header.rs)
|
||||
```rust,ignore
|
||||
pub struct SealedHeader {
|
||||
/// Locked Header fields.
|
||||
header: Header,
|
||||
/// Locked Header hash.
|
||||
hash: BlockHash,
|
||||
}
|
||||
```
|
||||
|
||||
Each `SealedHeader` is then validated to ensure that it has the proper parent. Note that this is only a basic response validation, and the `HeaderDownloader` uses the `validate` method during the `stream`, so that each header is validated according to the consensus specification before the header is yielded from the stream. After this, each header is then written to the database. If a header is not valid or the stream encounters any other error, the error is propagated up through the stage execution, the changes to the database are unwound and the stage is resumed from the most recent valid state.
|
||||
|
||||
@ -45,7 +93,17 @@ When the `BodyStage` is looking at the headers to determine which block to downl
|
||||
|
||||
Once the `BodyStage` determines which block bodies to fetch, a new `bodies_stream` is created which downloads all of the bodies from the `starting_block`, up until the `target_block` specified. Each time the `bodies_stream` yields a value, a `SealedBlock` is created using the block header, the ommers hash and the newly downloaded block body.
|
||||
|
||||
{{#template ../templates/source_and_github.md path_to_root=../../ path=crates/primitives/src/block.rs anchor=struct-SealedBlock}}
|
||||
[File: crates/primitives/src/block.rs](https://github.com/paradigmxyz/reth/blob/main/crates/primitives/src/block.rs)
|
||||
```rust,ignore
|
||||
pub struct SealedBlock {
|
||||
/// Locked block header.
|
||||
pub header: SealedHeader,
|
||||
/// Transactions with signatures.
|
||||
pub body: Vec<TransactionSigned>,
|
||||
/// Ommer/uncle headers
|
||||
pub ommers: Vec<SealedHeader>,
|
||||
}
|
||||
```
|
||||
|
||||
The new block is then pre-validated, checking that the ommers hash and transactions root in the block header are the same in the block body. Following a successful pre-validation, the `BodyStage` loops through each transaction in the `block.body`, adding the transaction to the database. This process is repeated for every downloaded block body, with the `BodyStage` returning `Ok(ExecOutput { stage_progress: highest_block, reached_tip: true, done })` signaling it successfully completed.
|
||||
|
||||
@ -55,7 +113,18 @@ The new block is then pre-validated, checking that the ommers hash and transacti
|
||||
|
||||
Following a successful `BodyStage`, the `SenderRecoveryStage` starts to execute. The `SenderRecoveryStage` is responsible for recovering the transaction sender for each of the newly added transactions to the database. At the beginning of the execution function, all of the transactions are first retrieved from the database. Then the `SenderRecoveryStage` goes through each transaction and recovers the signer from the transaction signature and hash. The transaction hash is derived by taking the Keccak 256-bit hash of the RLP encoded transaction bytes. This hash is then passed into the `recover_signer` function.
|
||||
|
||||
{{#template ../templates/source_and_github.md path_to_root=../../ path=crates/primitives/src/transaction/signature.rs anchor=fn-recover_signer}}
|
||||
[File: crates/primitives/src/transaction/signature.rs](https://github.com/paradigmxyz/reth/blob/main/crates/primitives/src/transaction/signature.rs)
|
||||
```rust,ignore
|
||||
pub(crate) fn recover_signer(&self, hash: H256) -> Option<Address> {
|
||||
let mut sig: [u8; 65] = [0; 65];
|
||||
|
||||
self.r.to_big_endian(&mut sig[0..32]);
|
||||
self.s.to_big_endian(&mut sig[32..64]);
|
||||
sig[64] = self.odd_y_parity as u8;
|
||||
|
||||
secp256k1::recover(&sig, hash.as_fixed_bytes()).ok()
|
||||
}
|
||||
```
|
||||
|
||||
In an [ECDSA (Elliptic Curve Digital Signature Algorithm) signature](https://wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm), the "r", "s", and "v" values are three pieces of data that are used to mathematically verify the authenticity of a digital signature. ECDSA is a widely used algorithm for generating and verifying digital signatures, and it is often used in cryptocurrencies like Ethereum.
|
||||
|
||||
@ -69,7 +138,16 @@ Once the transaction signer has been recovered, the signer is then added to the
|
||||
|
||||
Finally, after all headers, bodies and senders are added to the database, the `ExecutionStage` starts to execute. This stage is responsible for executing all of the transactions and updating the state stored in the database. For every new block header added to the database, the corresponding transactions have their signers attached to them and `reth_executor::executor::execute_and_verify_receipt()` is called, pushing the state changes resulting from the execution to a `Vec`.
|
||||
|
||||
{{#template ../templates/source_and_github.md path_to_root=../../ path=crates/stages/src/stages/execution.rs anchor=snippet-block_change_patches}}
|
||||
[File: crates/stages/src/stages/execution.rs](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/stages/execution.rs)
|
||||
```rust,ignore
|
||||
reth_executor::executor::execute_and_verify_receipt(
|
||||
header,
|
||||
&recovered_transactions,
|
||||
ommers,
|
||||
&self.config,
|
||||
state_provider,
|
||||
)
|
||||
```
|
||||
|
||||
After all headers and their corresponding transactions have been executed, all of the resulting state changes are applied to the database, updating account balances, account bytecode and other state changes. After applying all of the execution state changes, if there was a block reward, it is applied to the validator's account.
|
||||
|
||||
Reference in New Issue
Block a user