docs: improve pool imports + pending transactions docs (#6781)

This commit is contained in:
Matthias Seitz
2024-02-24 19:37:40 +01:00
committed by GitHub
parent 8abbe19392
commit 3669d9e046
2 changed files with 47 additions and 26 deletions

View File

@ -85,7 +85,9 @@ pub use self::constants::{
};
use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
/// The future for inserting a function into the pool
/// The future for importing transactions into the pool.
///
/// Resolves with the result of each transaction import.
pub type PoolImportFuture = Pin<Box<dyn Future<Output = Vec<PoolResult<TxHash>>> + Send + 'static>>;
/// Api to interact with [TransactionsManager] task.
@ -220,7 +222,17 @@ pub struct TransactionsManager<Pool> {
/// This way we can track incoming transactions and prevent multiple pool imports for the same
/// transaction
transactions_by_peers: HashMap<TxHash, Vec<PeerId>>,
/// Transactions that are currently imported into the `Pool`
/// Transactions that are currently imported into the `Pool`.
///
/// The import process includes:
/// - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
/// valid, or for 4844 transaction the blobs are valid. See also
/// [EthTransactionValidator](reth_transaction_pool::validate::EthTransactionValidator)
/// - if the transaction is valid, it is added into the pool.
///
/// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
/// [TransactionPool::pending_transactions_listener] and arrive at the `pending_transactions`
/// receiver.
pool_imports: FuturesUnordered<PoolImportFuture>,
/// Stats on pending pool imports that help the node self-monitor.
pending_pool_imports_info: PendingPoolImportsInfo,
@ -237,7 +249,14 @@ pub struct TransactionsManager<Pool> {
/// This will only receive commands if a user manually sends a command to the manager through
/// the [TransactionsHandle] to interact with this type directly.
command_rx: UnboundedReceiverStream<TransactionsCommand>,
/// Incoming commands from [`TransactionsHandle`].
/// A stream that yields new __pending__ transactions.
///
/// A transaction is considered __pending__ if it is executable on the current state of the
/// chain. In other words, this only yields transactions that satisfy all consensus
/// requirements, these include:
/// - no nonce gaps
/// - all dynamic fee requirements are (currently) met
/// - account has enough balance to cover the transaction's gas
pending_transactions: ReceiverStream<TxHash>,
/// Incoming events from the [`NetworkManager`](crate::NetworkManager).
transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent>,
@ -263,8 +282,8 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
&transactions_manager_config.transaction_fetcher_config,
);
// install a listener for new pending transactions that are allowed to be propagated over
// the network
// install a listener for new __pending__ transactions that are allowed to be propagated
// over the network
let pending = pool.pending_transactions_listener();
let pending_pool_imports_info = PendingPoolImportsInfo::default();
@ -385,18 +404,18 @@ where
}
}
/// Invoked when a new transaction is pending in the local pool.
/// Invoked when transactions in the local mempool are considered __pending__.
///
/// When new transactions appear in the pool, we propagate them to the network using the
/// `Transactions` and `NewPooledTransactionHashes` messages. The Transactions message relays
/// complete transaction objects and is typically sent to a small, random fraction of connected
/// peers.
/// When a transaction in the local mempool is moved to the pending pool, we propagate them to
/// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
/// messages. The Transactions message relays complete transaction objects and is typically
/// sent to a small, random fraction of connected peers.
///
/// All other peers receive a notification of the transaction hash and can request the
/// complete transaction object if it is unknown to them. The dissemination of complete
/// transactions to a fraction of peers usually ensures that all nodes receive the transaction
/// and won't need to request it.
fn on_new_transactions(&mut self, hashes: Vec<TxHash>) {
fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
// Nothing to propagate while initially syncing
if self.network.is_initially_syncing() {
return
@ -407,8 +426,8 @@ where
trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
// This fetches all transaction from the pool, including the blob transactions, which are
// only ever sent as hashes.
// This fetches all transaction from the pool, including the 4844 blob transactions but
// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
let propagated = self.propagate_transactions(
self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(),
);
@ -417,7 +436,7 @@ where
self.pool.on_propagated(propagated);
}
/// Propagate the transactions to all connected peers either as full objects or hashes
/// Propagate the transactions to all connected peers either as full objects or hashes.
///
/// The message for new pooled hashes depends on the negotiated version of the stream.
/// See [NewPooledTransactionHashes]
@ -856,7 +875,9 @@ where
/// Handles a command received from a detached [`TransactionsHandle`]
fn on_command(&mut self, cmd: TransactionsCommand) {
match cmd {
TransactionsCommand::PropagateHash(hash) => self.on_new_transactions(vec![hash]),
TransactionsCommand::PropagateHash(hash) => {
self.on_new_pending_transactions(vec![hash])
}
TransactionsCommand::PropagateHashesTo(hashes, peer) => {
self.propagate_hashes_to(hashes, peer)
}
@ -1297,10 +1318,10 @@ where
let acc = &mut poll_durations.acc_imported_txns;
duration_metered_exec!(
{
// drain successful pool insertions, handle and propagate transactions.
//
// higher priority! stream is drained
//
// drain new __pending__ transactions handle and propagate transactions.
// we drain this to batch the transactions in a single message.
// we don't expect this buffer to be large, since only pending transactions are
// emitted here.
let mut new_txs = Vec::new();
while let Poll::Ready(Some(hash)) =
this.pending_transactions.poll_next_unpin(cx)
@ -1308,7 +1329,7 @@ where
new_txs.push(hash);
}
if !new_txs.is_empty() {
this.on_new_transactions(new_txs);
this.on_new_pending_transactions(new_txs);
}
},
acc
@ -1545,9 +1566,9 @@ pub enum NetworkTransactionEvent {
/// Tracks stats about the [`TransactionsManager`].
#[derive(Debug)]
struct PendingPoolImportsInfo {
/// Number of transactions about to be imported into the pool.
/// Number of transactions that are currently being imported into pool.
pending_pool_imports: Arc<AtomicUsize>,
/// Max number of transactions about to be imported into the pool.
/// Max number of transactions allowed to be imported concurrently.
max_pending_pool_imports: usize,
}

View File

@ -114,15 +114,15 @@ pub trait TransactionPool: Send + Sync + Clone {
/// inserted into the pool that are allowed to be propagated.
///
/// Note: This is intended for networking and will __only__ yield transactions that are allowed
/// to be propagated over the network.
/// to be propagated over the network, see also [TransactionListenerKind].
///
/// Consumer: RPC/P2P
fn pending_transactions_listener(&self) -> Receiver<TxHash> {
self.pending_transactions_listener_for(TransactionListenerKind::PropagateOnly)
}
/// Returns a new Stream that yields transactions hashes for new __pending__ transactions
/// inserted into the pool depending on the given [TransactionListenerKind] argument.
/// Returns a new [Receiver] that yields transactions hashes for new __pending__ transactions
/// inserted into the pending pool depending on the given [TransactionListenerKind] argument.
fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash>;
/// Returns a new stream that yields new valid transactions added to the pool.
@ -130,7 +130,7 @@ pub trait TransactionPool: Send + Sync + Clone {
self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly)
}
/// Returns a new Stream that yields blob "sidecars" (blobs w/ assoc. kzg
/// Returns a new [Receiver] that yields blob "sidecars" (blobs w/ assoc. kzg
/// commitments/proofs) for eip-4844 transactions inserted into the pool
fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar>;