chore: use new aquamarine macro (#5785)

This commit is contained in:
Bjerg
2023-12-15 16:49:01 +02:00
committed by GitHub
parent bf37c8a076
commit 6af8e0f7ea
14 changed files with 229 additions and 225 deletions

6
Cargo.lock generated
View File

@ -332,16 +332,16 @@ checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]] [[package]]
name = "aquamarine" name = "aquamarine"
version = "0.3.2" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df752953c49ce90719c7bf1fc587bc8227aed04732ea0c0f85e5397d7fdbd1a1" checksum = "074b80d14d0240b6ce94d68f059a2d26a5d77280ae142662365a21ef6e2594ef"
dependencies = [ dependencies = [
"include_dir", "include_dir",
"itertools 0.10.5", "itertools 0.10.5",
"proc-macro-error", "proc-macro-error",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.109", "syn 2.0.39",
] ]
[[package]] [[package]]

View File

@ -157,7 +157,7 @@ boa_engine = "0.17"
boa_gc = "0.17" boa_gc = "0.17"
# misc # misc
aquamarine = "0.3" aquamarine = "0.4"
bytes = "1.5" bytes = "1.5"
bitflags = "2.4" bitflags = "2.4"
clap = "4" clap = "4"

View File

@ -37,40 +37,25 @@ use tracing::{debug, error, info, instrument, trace, warn};
#[cfg_attr(doc, aquamarine::aquamarine)] #[cfg_attr(doc, aquamarine::aquamarine)]
/// A Tree of chains. /// A Tree of chains.
/// ///
/// Mermaid flowchart represents all the states a block can have inside the blockchaintree. /// The flowchart represents all the states a block can have inside the tree.
/// Green blocks belong to canonical chain and are saved inside then database, they are our main
/// chain. Pending blocks and sidechains are found in memory inside [`BlockchainTree`].
/// Both pending and sidechains have same mechanisms only difference is when they get committed to
/// the database. For pending it is an append operation but for sidechains they need to move current
/// canonical blocks to BlockchainTree and commit the sidechain to the database to become canonical
/// chain (reorg). ```mermaid
/// flowchart BT
/// subgraph canonical chain
/// CanonState:::state
/// block0canon:::canon -->block1canon:::canon -->block2canon:::canon -->block3canon:::canon -->
/// block4canon:::canon --> block5canon:::canon end
/// block5canon --> block6pending1:::pending
/// block5canon --> block6pending2:::pending
/// subgraph sidechain2
/// S2State:::state
/// block3canon --> block4s2:::sidechain --> block5s2:::sidechain
/// end
/// subgraph sidechain1
/// S1State:::state
/// block2canon --> block3s1:::sidechain --> block4s1:::sidechain --> block5s1:::sidechain -->
/// block6s1:::sidechain end
/// classDef state fill:#1882C4
/// classDef canon fill:#8AC926
/// classDef pending fill:#FFCA3A
/// classDef sidechain fill:#FF595E
/// ```
///
/// ///
/// main functions: /// - Green blocks belong to the canonical chain and are saved inside the database.
/// * [BlockchainTree::insert_block]: Connect block to chain, execute it and if valid insert block /// - Pending blocks and sidechains are found in-memory inside [`BlockchainTree`].
/// into the tree. ///
/// * [BlockchainTree::finalize_block]: Remove chains that are branch off the now finalized block. /// Both pending chains and sidechains have the same mechanisms, the only difference is when they
/// * [BlockchainTree::make_canonical]: Check if we have the hash of block that is the current /// get committed to the database.
///
/// For pending, it is an append operation, but for sidechains they need to move the current
/// canonical blocks to the tree (by removing them from the database), and commit the sidechain
/// blocks to the database to become the canonical chain (reorg).
///
/// include_mmd!("docs/mermaid/tree.mmd")
///
/// # Main functions
/// * [BlockchainTree::insert_block]: Connect a block to a chain, execute it, and if valid, insert
/// the block into the tree.
/// * [BlockchainTree::finalize_block]: Remove chains that branch off of the now finalized block.
/// * [BlockchainTree::make_canonical]: Check if we have the hash of a block that is the current
/// canonical head and commit it to db. /// canonical head and commit it to db.
#[derive(Debug)] #[derive(Debug)]
pub struct BlockchainTree<DB: Database, EF: ExecutorFactory> { pub struct BlockchainTree<DB: Database, EF: ExecutorFactory> {

View File

@ -18,44 +18,13 @@ use std::sync::{
}; };
use tokio::sync::{mpsc::UnboundedSender, oneshot}; use tokio::sync::{mpsc::UnboundedSender, oneshot};
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Front-end API for fetching data from the network. /// Front-end API for fetching data from the network.
/// ///
/// Following diagram illustrates how a request, See [`HeadersClient::get_headers`] and /// Following diagram illustrates how a request, See [`HeadersClient::get_headers`] and
/// [`BodiesClient::get_block_bodies`] is handled internally. /// [`BodiesClient::get_block_bodies`] is handled internally.
#[cfg_attr(doc, aquamarine::aquamarine)] ///
/// ```mermaid /// include_mmd!("docs/mermaid/fetch-client.mmd")
/// sequenceDiagram
// participant Client as FetchClient
// participant Fetcher as StateFetcher
// participant State as NetworkState
// participant Session as Active Peer Session
// participant Peers as PeerManager
// loop Send Request, retry if retriable and remaining retries
// Client->>Fetcher: DownloadRequest{GetHeaders, GetBodies}
// Note over Client,Fetcher: Request and oneshot Sender sent via `request_tx` channel
// loop Process buffered requests
// State->>Fetcher: poll action
// Fetcher->>Fetcher: Select Available Peer
// Note over Fetcher: Peer is available if it's currently idle, no inflight requests
// Fetcher->>State: FetchAction::BlockDownloadRequest
// State->>Session: Delegate Request
// Note over State,Session: Request and oneshot Sender sent via `to_session_tx` channel
// end
// Session->>Session: Send Request to remote
// Session->>Session: Enforce Request timeout
// Session-->>State: Send Response Result via channel
// State->>Fetcher: Delegate Response
// Fetcher-->>Client: Send Response via channel
// opt Bad Response
// Client->>Peers: Penalize Peer
// end
// Peers->>Peers: Apply Reputation Change
// opt reputation dropped below threshold
// Peers->>State: Disconnect Session
// State->>Session: Delegate Disconnect
// end
// end
/// ```
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct FetchClient { pub struct FetchClient {
/// Sender half of the request channel. /// Sender half of the request channel.

View File

@ -60,34 +60,14 @@ use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Manages the _entire_ state of the network. /// Manages the _entire_ state of the network.
/// ///
/// This is an endless [`Future`] that consistently drives the state of the entire network forward. /// This is an endless [`Future`] that consistently drives the state of the entire network forward.
/// ///
/// The [`NetworkManager`] is the container type for all parts involved with advancing the network. /// The [`NetworkManager`] is the container type for all parts involved with advancing the network.
#[cfg_attr(doc, aquamarine::aquamarine)] ///
/// ```mermaid /// include_mmd!("docs/mermaid/network-manager.mmd")
/// graph TB
/// handle(NetworkHandle)
/// events(NetworkEvents)
/// transactions(Transactions Task)
/// ethrequest(ETH Request Task)
/// discovery(Discovery Task)
/// subgraph NetworkManager
/// direction LR
/// subgraph Swarm
/// direction TB
/// B1[(Session Manager)]
/// B2[(Connection Lister)]
/// B3[(Network State)]
/// end
/// end
/// handle <--> |request response channel| NetworkManager
/// NetworkManager --> |Network events| events
/// transactions <--> |transactions| NetworkManager
/// ethrequest <--> |ETH request handing| NetworkManager
/// discovery --> |Discovered peers| NetworkManager
/// ```
#[derive(Debug)] #[derive(Debug)]
#[must_use = "The NetworkManager does nothing unless polled"] #[must_use = "The NetworkManager does nothing unless polled"]
pub struct NetworkManager<C> { pub struct NetworkManager<C> {
@ -543,7 +523,7 @@ where
if self.handle.mode().is_stake() { if self.handle.mode().is_stake() {
// See [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p) // See [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)"); warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
return return;
} }
let msg = NewBlockMessage { hash, block: Arc::new(block) }; let msg = NewBlockMessage { hash, block: Arc::new(block) };
self.swarm.state_mut().announce_new_block(msg); self.swarm.state_mut().announce_new_block(msg);
@ -638,7 +618,7 @@ where
// This is only possible if the channel was deliberately closed since we always // This is only possible if the channel was deliberately closed since we always
// have an instance of `NetworkHandle` // have an instance of `NetworkHandle`
error!("Network message channel closed."); error!("Network message channel closed.");
return Poll::Ready(()) return Poll::Ready(());
} }
Poll::Ready(Some(msg)) => this.on_handle_message(msg), Poll::Ready(Some(msg)) => this.on_handle_message(msg),
}; };
@ -909,7 +889,7 @@ where
if budget == 0 { if budget == 0 {
// make sure we're woken up again // make sure we're woken up again
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
break break;
} }
} }

View File

@ -23,6 +23,7 @@ use std::{
}; };
use tracing::trace; use tracing::trace;
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Contains the connectivity related state of the network. /// Contains the connectivity related state of the network.
/// ///
/// A swarm emits [`SwarmEvent`]s when polled. /// A swarm emits [`SwarmEvent`]s when polled.
@ -44,24 +45,8 @@ use tracing::trace;
/// request channel for the created session and sends requests it receives from the /// request channel for the created session and sends requests it receives from the
/// [`StateFetcher`], which receives request objects from the client interfaces responsible for /// [`StateFetcher`], which receives request objects from the client interfaces responsible for
/// downloading headers and bodies. /// downloading headers and bodies.
#[cfg_attr(doc, aquamarine::aquamarine)] ///
/// ```mermaid /// include_mmd!("docs/mermaid/swarm.mmd")
/// graph TB
/// connections(TCP Listener)
/// Discovery[(Discovery)]
/// fetchRequest(Client Interfaces)
/// Sessions[(SessionManager)]
/// SessionTask[(Peer Session)]
/// State[(State)]
/// StateFetch[(State Fetcher)]
/// connections --> |incoming| Sessions
/// State --> |initiate outgoing| Sessions
/// Discovery --> |update peers| State
/// Sessions --> |spawns| SessionTask
/// SessionTask <--> |handle state requests| State
/// fetchRequest --> |request Headers, Bodies| StateFetch
/// State --> |poll pending requests| StateFetch
/// ```
#[derive(Debug)] #[derive(Debug)]
#[must_use = "Swarm does nothing unless polled"] #[must_use = "Swarm does nothing unless polled"]
pub(crate) struct Swarm<C> { pub(crate) struct Swarm<C> {
@ -207,7 +192,7 @@ where
ListenerEvent::Incoming { stream, remote_addr } => { ListenerEvent::Incoming { stream, remote_addr } => {
// Reject incoming connection if node is shutting down. // Reject incoming connection if node is shutting down.
if self.is_shutting_down() { if self.is_shutting_down() {
return None return None;
} }
// ensure we can handle an incoming connection from this address // ensure we can handle an incoming connection from this address
if let Err(err) = if let Err(err) =
@ -225,13 +210,13 @@ where
); );
} }
} }
return None return None;
} }
match self.sessions.on_incoming(stream, remote_addr) { match self.sessions.on_incoming(stream, remote_addr) {
Ok(session_id) => { Ok(session_id) => {
trace!(target: "net", ?remote_addr, "Incoming connection"); trace!(target: "net", ?remote_addr, "Incoming connection");
return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr }) return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr });
} }
Err(err) => { Err(err) => {
trace!(target: "net", ?err, "Incoming connection rejected, capacity already reached."); trace!(target: "net", ?err, "Incoming connection rejected, capacity already reached.");
@ -250,7 +235,7 @@ where
match event { match event {
StateAction::Connect { remote_addr, peer_id } => { StateAction::Connect { remote_addr, peer_id } => {
self.dial_outbound(remote_addr, peer_id); self.dial_outbound(remote_addr, peer_id);
return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id }) return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id });
} }
StateAction::Disconnect { peer_id, reason } => { StateAction::Disconnect { peer_id, reason } => {
self.sessions.disconnect(peer_id, reason); self.sessions.disconnect(peer_id, reason);
@ -268,7 +253,7 @@ where
StateAction::DiscoveredNode { peer_id, socket_addr, fork_id } => { StateAction::DiscoveredNode { peer_id, socket_addr, fork_id } => {
// Don't try to connect to peer if node is shutting down // Don't try to connect to peer if node is shutting down
if self.is_shutting_down() { if self.is_shutting_down() {
return None return None;
} }
// Insert peer only if no fork id or a valid fork id // Insert peer only if no fork id or a valid fork id
if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) { if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) {
@ -317,7 +302,7 @@ where
loop { loop {
while let Poll::Ready(action) = this.state.poll(cx) { while let Poll::Ready(action) = this.state.poll(cx) {
if let Some(event) = this.on_state_action(action) { if let Some(event) = this.on_state_action(action) {
return Poll::Ready(Some(event)) return Poll::Ready(Some(event));
} }
} }
@ -326,9 +311,9 @@ where
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(event) => { Poll::Ready(event) => {
if let Some(event) = this.on_session_event(event) { if let Some(event) = this.on_session_event(event) {
return Poll::Ready(Some(event)) return Poll::Ready(Some(event));
} }
continue continue;
} }
} }
@ -337,13 +322,13 @@ where
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(event) => { Poll::Ready(event) => {
if let Some(event) = this.on_connection(event) { if let Some(event) = this.on_connection(event) {
return Poll::Ready(Some(event)) return Poll::Ready(Some(event));
} }
continue continue;
} }
} }
return Poll::Pending return Poll::Pending;
} }
} }
} }

View File

@ -49,39 +49,7 @@ pub type PipelineWithResult<DB> = (Pipeline<DB>, Result<ControlFlow, PipelineErr
/// After the entire pipeline has been run, it will run again unless asked to stop (see /// After the entire pipeline has been run, it will run again unless asked to stop (see
/// [Pipeline::set_max_block]). /// [Pipeline::set_max_block]).
/// ///
/// ```mermaid /// include_mmd!("docs/mermaid/pipeline.mmd")
/// graph TB
/// Start[Start]
/// Done[Done]
/// Error[Error]
/// subgraph Unwind
/// StartUnwind(Unwind in reverse order of execution)
/// UnwindStage(Unwind stage)
/// NextStageToUnwind(Next stage)
/// end
/// subgraph Single loop
/// RunLoop(Run loop)
/// NextStage(Next stage)
/// LoopDone(Loop done)
/// subgraph Stage Execution
/// Execute(Execute stage)
/// end
/// end
/// Start --> RunLoop --> NextStage
/// NextStage --> |No stages left| LoopDone
/// NextStage --> |Next stage| Execute
/// Execute --> |Not done| Execute
/// Execute --> |Unwind requested| StartUnwind
/// Execute --> |Done| NextStage
/// Execute --> |Error| Error
/// StartUnwind --> NextStageToUnwind
/// NextStageToUnwind --> |Next stage| UnwindStage
/// NextStageToUnwind --> |No stages left| RunLoop
/// UnwindStage --> |Error| Error
/// UnwindStage --> |Unwound| NextStageToUnwind
/// LoopDone --> |Target block reached| Done
/// LoopDone --> |Target block not reached| RunLoop
/// ```
/// ///
/// # Unwinding /// # Unwinding
/// ///
@ -193,7 +161,7 @@ where
max_block = ?self.max_block, max_block = ?self.max_block,
"Terminating pipeline." "Terminating pipeline."
); );
return Ok(()) return Ok(());
} }
} }
} }
@ -229,7 +197,7 @@ where
ControlFlow::Continue { block_number } => self.progress.update(block_number), ControlFlow::Continue { block_number } => self.progress.update(block_number),
ControlFlow::Unwind { target, bad_block } => { ControlFlow::Unwind { target, bad_block } => {
self.unwind(target, Some(bad_block.number))?; self.unwind(target, Some(bad_block.number))?;
return Ok(ControlFlow::Unwind { target, bad_block }) return Ok(ControlFlow::Unwind { target, bad_block });
} }
} }
@ -272,7 +240,7 @@ where
"Unwind point too far for stage" "Unwind point too far for stage"
); );
self.listeners.notify(PipelineEvent::Skipped { stage_id }); self.listeners.notify(PipelineEvent::Skipped { stage_id });
continue continue;
} }
debug!( debug!(
@ -317,7 +285,7 @@ where
} }
Err(err) => { Err(err) => {
self.listeners.notify(PipelineEvent::Error { stage_id }); self.listeners.notify(PipelineEvent::Error { stage_id });
return Err(PipelineError::Stage(StageError::Fatal(Box::new(err)))) return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))));
} }
} }
} }
@ -357,7 +325,7 @@ where
// We reached the maximum block, so we skip the stage // We reached the maximum block, so we skip the stage
return Ok(ControlFlow::NoProgress { return Ok(ControlFlow::NoProgress {
block_number: prev_checkpoint.map(|progress| progress.block_number), block_number: prev_checkpoint.map(|progress| progress.block_number),
}) });
} }
let exec_input = ExecInput { target, checkpoint: prev_checkpoint }; let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
@ -412,7 +380,7 @@ where
ControlFlow::Continue { block_number } ControlFlow::Continue { block_number }
} else { } else {
ControlFlow::NoProgress { block_number: Some(block_number) } ControlFlow::NoProgress { block_number: Some(block_number) }
}) });
} }
} }
Err(err) => { Err(err) => {
@ -421,7 +389,7 @@ where
if let Some(ctrl) = if let Some(ctrl) =
on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)?
{ {
return Ok(ctrl) return Ok(ctrl);
} }
} }
} }

View File

@ -32,45 +32,12 @@ use std::{
sync::Arc, sync::Arc,
}; };
#[cfg_attr(doc, aquamarine::aquamarine)]
/// A pool that manages transactions. /// A pool that manages transactions.
/// ///
/// This pool maintains the state of all transactions and stores them accordingly. /// This pool maintains the state of all transactions and stores them accordingly.
///
#[cfg_attr(doc, aquamarine::aquamarine)] /// include_mmd!("docs/mermaid/txpool.mmd")
/// ```mermaid
/// graph TB
/// subgraph TxPool
/// direction TB
/// pool[(All Transactions)]
/// subgraph Subpools
/// direction TB
/// B3[(Queued)]
/// B1[(Pending)]
/// B2[(Basefee)]
/// B4[(Blob)]
/// end
/// end
/// discard([discard])
/// production([Block Production])
/// new([New Block])
/// A[Incoming Tx] --> B[Validation] -->|insert| pool
/// pool --> |if ready + blobfee too low| B4
/// pool --> |if ready| B1
/// pool --> |if ready + basfee too low| B2
/// pool --> |nonce gap or lack of funds| B3
/// pool --> |update| pool
/// B1 --> |best| production
/// B2 --> |worst| discard
/// B3 --> |worst| discard
/// B4 --> |worst| discard
/// B1 --> |increased blob fee| B4
/// B4 --> |decreased blob fee| B1
/// B1 --> |increased base fee| B2
/// B2 --> |decreased base fee| B1
/// B3 --> |promote| B1
/// B3 --> |promote| B2
/// new --> |apply state changes| pool
/// ```
pub struct TxPool<T: TransactionOrdering> { pub struct TxPool<T: TransactionOrdering> {
/// Contains the currently known information about the senders. /// Contains the currently known information about the senders.
sender_info: FnvHashMap<SenderId, SenderInfo>, sender_info: FnvHashMap<SenderId, SenderInfo>,
@ -509,7 +476,7 @@ impl<T: TransactionOrdering> TxPool<T> {
on_chain_nonce: u64, on_chain_nonce: u64,
) -> PoolResult<AddedTransaction<T::Transaction>> { ) -> PoolResult<AddedTransaction<T::Transaction>> {
if self.contains(tx.hash()) { if self.contains(tx.hash()) {
return Err(PoolError::new(*tx.hash(), PoolErrorKind::AlreadyImported)) return Err(PoolError::new(*tx.hash(), PoolErrorKind::AlreadyImported));
} }
// Update sender info with balance and nonce // Update sender info with balance and nonce
@ -737,7 +704,7 @@ impl<T: TransactionOrdering> TxPool<T> {
} }
id = descendant; id = descendant;
} else { } else {
return return;
} }
} }
} }
@ -964,7 +931,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
let count = entry.get_mut(); let count = entry.get_mut();
if *count == 1 { if *count == 1 {
entry.remove(); entry.remove();
return return;
} }
*count -= 1; *count -= 1;
} }
@ -1024,7 +991,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
($iter:ident) => { ($iter:ident) => {
'this: while let Some((peek, _)) = iter.peek() { 'this: while let Some((peek, _)) = iter.peek() {
if peek.sender != id.sender { if peek.sender != id.sender {
break 'this break 'this;
} }
iter.next(); iter.next();
} }
@ -1043,7 +1010,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
current: tx.subpool, current: tx.subpool,
destination: Destination::Discard, destination: Destination::Discard,
}); });
continue 'transactions continue 'transactions;
} }
let ancestor = TransactionId::ancestor(id.nonce, info.state_nonce, id.sender); let ancestor = TransactionId::ancestor(id.nonce, info.state_nonce, id.sender);
@ -1066,7 +1033,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
// If there's a nonce gap, we can shortcircuit, because there's nothing to update yet. // If there's a nonce gap, we can shortcircuit, because there's nothing to update yet.
if tx.state.has_nonce_gap() { if tx.state.has_nonce_gap() {
next_sender!(iter); next_sender!(iter);
continue 'transactions continue 'transactions;
} }
// Since this is the first transaction of the sender, it has no parked ancestors // Since this is the first transaction of the sender, it has no parked ancestors
@ -1086,13 +1053,13 @@ impl<T: PoolTransaction> AllTransactions<T> {
while let Some((peek, ref mut tx)) = iter.peek_mut() { while let Some((peek, ref mut tx)) = iter.peek_mut() {
if peek.sender != id.sender { if peek.sender != id.sender {
// Found the next sender // Found the next sender
continue 'transactions continue 'transactions;
} }
// can short circuit // can short circuit
if tx.state.has_nonce_gap() { if tx.state.has_nonce_gap() {
next_sender!(iter); next_sender!(iter);
continue 'transactions continue 'transactions;
} }
// update cumulative cost // update cumulative cost
@ -1254,7 +1221,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
fn contains_conflicting_transaction(&self, tx: &ValidPoolTransaction<T>) -> bool { fn contains_conflicting_transaction(&self, tx: &ValidPoolTransaction<T>) -> bool {
let mut iter = self.txs_iter(tx.transaction_id.sender); let mut iter = self.txs_iter(tx.transaction_id.sender);
if let Some((_, existing)) = iter.next() { if let Some((_, existing)) = iter.next() {
return tx.tx_type_conflicts_with(&existing.transaction) return tx.tx_type_conflicts_with(&existing.transaction);
} }
// no existing transaction for this sender // no existing transaction for this sender
false false
@ -1278,7 +1245,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
if current_txs >= self.max_account_slots { if current_txs >= self.max_account_slots {
return Err(InsertErr::ExceededSenderTransactionsCapacity { return Err(InsertErr::ExceededSenderTransactionsCapacity {
transaction: Arc::new(transaction), transaction: Arc::new(transaction),
}) });
} }
} }
if transaction.gas_limit() > self.block_gas_limit { if transaction.gas_limit() > self.block_gas_limit {
@ -1286,12 +1253,12 @@ impl<T: PoolTransaction> AllTransactions<T> {
block_gas_limit: self.block_gas_limit, block_gas_limit: self.block_gas_limit,
tx_gas_limit: transaction.gas_limit(), tx_gas_limit: transaction.gas_limit(),
transaction: Arc::new(transaction), transaction: Arc::new(transaction),
}) });
} }
if self.contains_conflicting_transaction(&transaction) { if self.contains_conflicting_transaction(&transaction) {
// blob vs non blob transactions are mutually exclusive for the same sender // blob vs non blob transactions are mutually exclusive for the same sender
return Err(InsertErr::TxTypeConflict { transaction: Arc::new(transaction) }) return Err(InsertErr::TxTypeConflict { transaction: Arc::new(transaction) });
} }
Ok(transaction) Ok(transaction)
@ -1311,12 +1278,12 @@ impl<T: PoolTransaction> AllTransactions<T> {
if let Some(ancestor) = ancestor { if let Some(ancestor) = ancestor {
let Some(ancestor_tx) = self.txs.get(&ancestor) else { let Some(ancestor_tx) = self.txs.get(&ancestor) else {
// ancestor tx is missing, so we can't insert the new blob // ancestor tx is missing, so we can't insert the new blob
return Err(InsertErr::BlobTxHasNonceGap { transaction: Arc::new(new_blob_tx) }) return Err(InsertErr::BlobTxHasNonceGap { transaction: Arc::new(new_blob_tx) });
}; };
if ancestor_tx.state.has_nonce_gap() { if ancestor_tx.state.has_nonce_gap() {
// the ancestor transaction already has a nonce gap, so we can't insert the new // the ancestor transaction already has a nonce gap, so we can't insert the new
// blob // blob
return Err(InsertErr::BlobTxHasNonceGap { transaction: Arc::new(new_blob_tx) }) return Err(InsertErr::BlobTxHasNonceGap { transaction: Arc::new(new_blob_tx) });
} }
// the max cost executing this transaction requires // the max cost executing this transaction requires
@ -1325,7 +1292,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
// check if the new blob would go into overdraft // check if the new blob would go into overdraft
if cumulative_cost > on_chain_balance { if cumulative_cost > on_chain_balance {
// the transaction would go into overdraft // the transaction would go into overdraft
return Err(InsertErr::Overdraft { transaction: Arc::new(new_blob_tx) }) return Err(InsertErr::Overdraft { transaction: Arc::new(new_blob_tx) });
} }
// ensure that a replacement would not shift already propagated blob transactions into // ensure that a replacement would not shift already propagated blob transactions into
@ -1342,14 +1309,14 @@ impl<T: PoolTransaction> AllTransactions<T> {
cumulative_cost += tx.transaction.cost(); cumulative_cost += tx.transaction.cost();
if tx.transaction.is_eip4844() && cumulative_cost > on_chain_balance { if tx.transaction.is_eip4844() && cumulative_cost > on_chain_balance {
// the transaction would shift // the transaction would shift
return Err(InsertErr::Overdraft { transaction: Arc::new(new_blob_tx) }) return Err(InsertErr::Overdraft { transaction: Arc::new(new_blob_tx) });
} }
} }
} }
} }
} else if new_blob_tx.cost() > on_chain_balance { } else if new_blob_tx.cost() > on_chain_balance {
// the transaction would go into overdraft // the transaction would go into overdraft
return Err(InsertErr::Overdraft { transaction: Arc::new(new_blob_tx) }) return Err(InsertErr::Overdraft { transaction: Arc::new(new_blob_tx) });
} }
Ok(new_blob_tx) Ok(new_blob_tx)
@ -1369,7 +1336,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
if maybe_replacement.max_fee_per_gas() <= if maybe_replacement.max_fee_per_gas() <=
existing_transaction.max_fee_per_gas() * price_bump_multiplier existing_transaction.max_fee_per_gas() * price_bump_multiplier
{ {
return true return true;
} }
let existing_max_priority_fee_per_gas = let existing_max_priority_fee_per_gas =
@ -1382,7 +1349,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
existing_max_priority_fee_per_gas != 0 && existing_max_priority_fee_per_gas != 0 &&
replacement_max_priority_fee_per_gas != 0 replacement_max_priority_fee_per_gas != 0
{ {
return true return true;
} }
// check max blob fee per gas // check max blob fee per gas
@ -1395,7 +1362,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
if replacement_max_blob_fee_per_gas <= if replacement_max_blob_fee_per_gas <=
existing_max_blob_fee_per_gas * price_bump_multiplier existing_max_blob_fee_per_gas * price_bump_multiplier
{ {
return true return true;
} }
} }
@ -1484,7 +1451,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
let fee_cap = transaction.max_fee_per_gas(); let fee_cap = transaction.max_fee_per_gas();
if fee_cap < self.minimal_protocol_basefee as u128 { if fee_cap < self.minimal_protocol_basefee as u128 {
return Err(InsertErr::FeeCapBelowMinimumProtocolFeeCap { transaction, fee_cap }) return Err(InsertErr::FeeCapBelowMinimumProtocolFeeCap { transaction, fee_cap });
} }
if fee_cap >= self.pending_fees.base_fee as u128 { if fee_cap >= self.pending_fees.base_fee as u128 {
state.insert(TxState::ENOUGH_FEE_CAP_BLOCK); state.insert(TxState::ENOUGH_FEE_CAP_BLOCK);
@ -1523,7 +1490,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
return Err(InsertErr::Underpriced { return Err(InsertErr::Underpriced {
transaction: pool_tx.transaction, transaction: pool_tx.transaction,
existing: *entry.get().transaction.hash(), existing: *entry.get().transaction.hash(),
}) });
} }
let new_hash = *pool_tx.transaction.hash(); let new_hash = *pool_tx.transaction.hash();
let new_transaction = pool_tx.transaction.clone(); let new_transaction = pool_tx.transaction.clone();
@ -1566,7 +1533,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
// If there's a nonce gap, we can shortcircuit // If there's a nonce gap, we can shortcircuit
if next_nonce != id.nonce { if next_nonce != id.nonce {
break break;
} }
// close the nonce gap // close the nonce gap

View File

@ -0,0 +1,31 @@
sequenceDiagram
participant Client as FetchClient
participant Fetcher as StateFetcher
participant State as NetworkState
participant Session as Active Peer Session
participant Peers as PeerManager
loop Send Request, retry if retriable and remaining retries
Client->>Fetcher: DownloadRequest{GetHeaders, GetBodies}
Note over Client,Fetcher: Request and oneshot Sender sent via `request_tx` channel
loop Process buffered requests
State->>Fetcher: poll action
Fetcher->>Fetcher: Select Available Peer
Note over Fetcher: Peer is available if it's currently idle, no inflight requests
Fetcher->>State: FetchAction::BlockDownloadRequest
State->>Session: Delegate Request
Note over State,Session: Request and oneshot Sender sent via `to_session_tx` channel
end
Session->>Session: Send Request to remote
Session->>Session: Enforce Request timeout
Session-->>State: Send Response Result via channel
State->>Fetcher: Delegate Response
Fetcher-->>Client: Send Response via channel
opt Bad Response
Client->>Peers: Penalize Peer
end
Peers->>Peers: Apply Reputation Change
opt reputation dropped below threshold
Peers->>State: Disconnect Session
State->>Session: Delegate Disconnect
end
end

View File

@ -0,0 +1,20 @@
graph TB
handle(NetworkHandle)
events(NetworkEvents)
transactions(Transactions Task)
ethrequest(ETH Request Task)
discovery(Discovery Task)
subgraph NetworkManager
direction LR
subgraph Swarm
direction TB
B1[(Session Manager)]
B2[(Connection Lister)]
B3[(Network State)]
end
end
handle <--> |request response channel| NetworkManager
NetworkManager --> |Network events| events
transactions <--> |transactions| NetworkManager
ethrequest <--> |ETH request handing| NetworkManager
discovery --> |Discovered peers| NetworkManager

31
docs/mermaid/pipeline.mmd Normal file
View File

@ -0,0 +1,31 @@
graph TB
Start[Start]
Done[Done]
Error[Error]
subgraph Unwind
StartUnwind(Unwind in reverse order of execution)
UnwindStage(Unwind stage)
NextStageToUnwind(Next stage)
end
subgraph Single loop
RunLoop(Run loop)
NextStage(Next stage)
LoopDone(Loop done)
subgraph Stage Execution
Execute(Execute stage)
end
end
Start --> RunLoop --> NextStage
NextStage --> |No stages left| LoopDone
NextStage --> |Next stage| Execute
Execute --> |Not done| Execute
Execute --> |Unwind requested| StartUnwind
Execute --> |Done| NextStage
Execute --> |Error| Error
StartUnwind --> NextStageToUnwind
NextStageToUnwind --> |Next stage| UnwindStage
NextStageToUnwind --> |No stages left| RunLoop
UnwindStage --> |Error| Error
UnwindStage --> |Unwound| NextStageToUnwind
LoopDone --> |Target block reached| Done
LoopDone --> |Target block not reached| RunLoop

15
docs/mermaid/swarm.mmd Normal file
View File

@ -0,0 +1,15 @@
graph TB
connections(TCP Listener)
Discovery[(Discovery)]
fetchRequest(Client Interfaces)
Sessions[(SessionManager)]
SessionTask[(Peer Session)]
State[(State)]
StateFetch[(State Fetcher)]
connections --> |incoming| Sessions
State --> |initiate outgoing| Sessions
Discovery --> |update peers| State
Sessions --> |spawns| SessionTask
SessionTask <--> |handle state requests| State
fetchRequest --> |request Headers, Bodies| StateFetch
State --> |poll pending requests| StateFetch

21
docs/mermaid/tree.mmd Normal file
View File

@ -0,0 +1,21 @@
flowchart BT
subgraph canonical chain
CanonState:::state
block0canon:::canon -->block1canon:::canon -->block2canon:::canon -->block3canon:::canon -->
block4canon:::canon --> block5canon:::canon
end
block5canon --> block6pending1:::pending
block5canon --> block6pending2:::pending
subgraph sidechain2
S2State:::state
block3canon --> block4s2:::sidechain --> block5s2:::sidechain
end
subgraph sidechain1
S1State:::state
block2canon --> block3s1:::sidechain --> block4s1:::sidechain --> block5s1:::sidechain -->
block6s1:::sidechain
end
classDef state fill:#1882C4
classDef canon fill:#8AC926
classDef pending fill:#FFCA3A
classDef sidechain fill:#FF595E

32
docs/mermaid/txpool.mmd Normal file
View File

@ -0,0 +1,32 @@
graph TB
subgraph TxPool
direction TB
pool[(All Transactions)]
subgraph Subpools
direction TB
B3[(Queued)]
B1[(Pending)]
B2[(Basefee)]
B4[(Blob)]
end
end
discard([discard])
production([Block Production])
new([New Block])
A[Incoming Tx] --> B[Validation] -->|insert| pool
pool --> |if ready + blobfee too low| B4
pool --> |if ready| B1
pool --> |if ready + basfee too low| B2
pool --> |nonce gap or lack of funds| B3
pool --> |update| pool
B1 --> |best| production
B2 --> |worst| discard
B3 --> |worst| discard
B4 --> |worst| discard
B1 --> |increased blob fee| B4
B4 --> |decreased blob fee| B1
B1 --> |increased base fee| B2
B2 --> |decreased base fee| B1
B3 --> |promote| B1
B3 --> |promote| B2
new --> |apply state changes| pool