mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Comment TransactionsManager (#6651)
This commit is contained in:
@ -1,3 +1,30 @@
|
||||
//! `TransactionFetcher` is responsible for rate limiting and retry logic for fetching
|
||||
//! transactions. Upon receiving an announcement, functionality of the `TransactionFetcher` is
|
||||
//! used for filtering out hashes 1) for which the tx is already known and 2) unknown but the hash
|
||||
//! is already seen in a previous announcement. The hashes that remain from an announcement are
|
||||
//! then packed into a request with respect to the [`EthVersion`] of the announcement. Any hashes
|
||||
//! that don't fit into the request, are buffered in the `TransactionFetcher`. If on the other
|
||||
//! hand, space remains, hashes that the peer has previously announced are taken out of buffered
|
||||
//! hashes to fill the request up. The [`GetPooledTransactions`] request is then sent to the
|
||||
//! peer's session, this marks the peer as active with respect to
|
||||
//! `MAX_CONCURRENT_TX_REQUESTS_PER_PEER`.
|
||||
//!
|
||||
//! When a peer buffers hashes in the `TransactionsManager::on_new_pooled_transaction_hashes`
|
||||
//! pipeline, it is stored as fallback peer for those hashes. When [`TransactionsManager`] is
|
||||
//! polled, it checks if any of fallback peer is idle. If so, it packs a request for that peer,
|
||||
//! filling it from the buffered hashes. It does so until there are no more idle peers or until
|
||||
//! the hashes buffer is empty.
|
||||
//!
|
||||
//! If a [`GetPooledTransactions`] request resolves with an error, the hashes in the request are
|
||||
//! buffered with respect to `MAX_REQUEST_RETRIES_PER_TX_HASH`. So is the case if the request
|
||||
//! resolves with partial success, that is some of the requested hashes are not in the response,
|
||||
//! these are then buffered.
|
||||
//!
|
||||
//! Most healthy peers will send the same hashes in their announcements, as RLPx is a gossip
|
||||
//! protocol. This means it's unlikely, that a valid hash, will be buffered for very long
|
||||
//! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large
|
||||
//! enough to buffer many hashes during network failure, to allow for recovery.
|
||||
|
||||
use crate::{
|
||||
cache::{LruCache, LruMap},
|
||||
message::PeerRequest,
|
||||
|
||||
@ -1,31 +1,4 @@
|
||||
//! Transactions management for the p2p network.
|
||||
//!
|
||||
//! `TransactionFetcher` is responsible for rate limiting and retry logic for fetching
|
||||
//! transactions. Upon receiving an announcement, functionality of the `TransactionFetcher` is
|
||||
//! used for filtering out hashes 1) for which the tx is already known and 2) unknown but the hash
|
||||
//! is already seen in a previous announcement. The hashes that remain from an announcement are
|
||||
//! then packed into a request with respect to the [`EthVersion`] of the announcement. Any hashes
|
||||
//! that don't fit into the request, are buffered in the `TransactionFetcher`. If on the other
|
||||
//! hand, space remains, hashes that the peer has previously announced are taken out of buffered
|
||||
//! hashes to fill the request up. The [`GetPooledTransactions`] request is then sent to the
|
||||
//! peer's session, this marks the peer as active with respect to
|
||||
//! `MAX_CONCURRENT_TX_REQUESTS_PER_PEER`.
|
||||
//!
|
||||
//! When a peer buffers hashes in the `TransactionsManager::on_new_pooled_transaction_hashes`
|
||||
//! pipeline, it is stored as fallback peer for those hashes. When [`TransactionsManager`] is
|
||||
//! polled, it checks if any of fallback peer is idle. If so, it packs a request for that peer,
|
||||
//! filling it from the buffered hashes. It does so until there are no more idle peers or until
|
||||
//! the hashes buffer is empty.
|
||||
//!
|
||||
//! If a [`GetPooledTransactions`] request resolves with an error, the hashes in the request are
|
||||
//! buffered with respect to `MAX_REQUEST_RETRIES_PER_TX_HASH`. So is the case if the request
|
||||
//! resolves with partial success, that is some of the requested hashes are not in the response,
|
||||
//! these are then buffered.
|
||||
//!
|
||||
//! Most healthy peers will send the same hashes in their announcements, as RLPx is a gossip
|
||||
//! protocol. This means it's unlikely, that a valid hash, will be buffered for very long
|
||||
//! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large
|
||||
//! enough to buffer many hashes during network failure, to allow for recovery.
|
||||
|
||||
use crate::{
|
||||
cache::LruCache,
|
||||
@ -1180,6 +1153,9 @@ struct TxManagerPollDurations {
|
||||
/// [`crate::NetworkManager`] for more context on the design pattern.
|
||||
///
|
||||
/// This should be spawned or used as part of `tokio::select!`.
|
||||
//
|
||||
// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
|
||||
// `NetworkConfig::start_network`(reth_network::NetworkConfig)
|
||||
impl<Pool> Future for TransactionsManager<Pool>
|
||||
where
|
||||
Pool: TransactionPool + Unpin + 'static,
|
||||
@ -1202,7 +1178,7 @@ where
|
||||
let acc = &mut poll_durations.acc_network_events;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
// advance network/peer related events
|
||||
// Advance network/peer related events (update peers map).
|
||||
if let Poll::Ready(Some(event)) = this.network_events.poll_next_unpin(cx) {
|
||||
this.on_network_event(event);
|
||||
some_ready = true;
|
||||
@ -1214,6 +1190,10 @@ where
|
||||
let acc = &mut poll_durations.acc_pending_fetch;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
// Tries to drain hashes pending fetch cache if the tx manager currently has
|
||||
// capacity for this (fetch txns).
|
||||
//
|
||||
// Sends at most one request.
|
||||
if this.has_capacity_for_fetching_pending_hashes() {
|
||||
// try drain transaction hashes pending fetch
|
||||
let info = &this.pending_pool_imports_info;
|
||||
@ -1238,7 +1218,7 @@ where
|
||||
let acc = &mut poll_durations.acc_cmds;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
// advance commands
|
||||
// Advance commands (propagate/fetch/serve txns).
|
||||
if let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
|
||||
this.on_command(cmd);
|
||||
some_ready = true;
|
||||
@ -1250,7 +1230,20 @@ where
|
||||
let acc = &mut poll_durations.acc_tx_events;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
// advance incoming transaction events
|
||||
// Advance incoming transaction events (stream new txns/announcements from
|
||||
// network manager and queue for import to pool/fetch txns).
|
||||
//
|
||||
// This will potentially remove hashes from hashes pending fetch, it the event
|
||||
// is an announcement (if same hashes are announced that didn't fit into a
|
||||
// previous request).
|
||||
//
|
||||
// The smallest decodable transaction is an empty legacy transaction, 10 bytes
|
||||
// (128 KiB / 10 bytes > 13k transactions).
|
||||
//
|
||||
// If this is an event with `Transactions` message, since transactions aren't
|
||||
// validated until they are inserted into the pool, this can potentially queue
|
||||
// >13k transactions for insertion to pool. More if the message size is bigger
|
||||
// than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
|
||||
if let Poll::Ready(Some(event)) = this.transaction_events.poll_next_unpin(cx) {
|
||||
this.on_network_tx_event(event);
|
||||
some_ready = true;
|
||||
@ -1264,7 +1257,16 @@ where
|
||||
{
|
||||
this.update_fetch_metrics();
|
||||
|
||||
// advance fetching transaction events
|
||||
// Advance fetching transaction events (flush transaction fetcher and queue for
|
||||
// import to pool).
|
||||
//
|
||||
// The smallest decodable transaction is an empty legacy transaction, 10 bytes
|
||||
// (2 MiB / 10 bytes > 200k transactions).
|
||||
//
|
||||
// Since transactions aren't validated until they are inserted into the pool,
|
||||
// this can potentially queue >200k transactions for insertion to pool. More
|
||||
// if the message size is bigger than the soft limit on a `PooledTransactions`
|
||||
// response which is 2 MiB.
|
||||
if let Poll::Ready(Some(fetch_event)) =
|
||||
this.transaction_fetcher.poll_next_unpin(cx)
|
||||
{
|
||||
@ -1295,7 +1297,20 @@ where
|
||||
let acc = &mut poll_durations.acc_pending_imports;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
// Advance all imports
|
||||
// Advance pool imports (flush txns to pool).
|
||||
//
|
||||
// Note, this is done in batches. A batch is filled from one `Transactions`
|
||||
// broadcast messages or one `PooledTransactions` response at a time. The
|
||||
// minimum batch size is 1 transaction (and might often be the case with blob
|
||||
// transactions).
|
||||
//
|
||||
// The smallest decodable transaction is an empty legacy transaction, 10 bytes
|
||||
// (2 MiB / 10 bytes > 200k transactions).
|
||||
//
|
||||
// Since transactions aren't validated until they are inserted into the pool,
|
||||
// this can potentially validate >200k transactions. More if the message size
|
||||
// is bigger than the soft limit on a `PooledTransactions` response which is
|
||||
// 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
|
||||
if let Poll::Ready(Some(batch_import_res)) =
|
||||
this.pool_imports.poll_next_unpin(cx)
|
||||
{
|
||||
@ -1329,9 +1344,11 @@ where
|
||||
let acc = &mut poll_durations.acc_imported_txns;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
// drain new __pending__ transactions handle and propagate transactions.
|
||||
// we drain this to batch the transactions in a single message.
|
||||
// we don't expect this buffer to be large, since only pending transactions are
|
||||
// Drain new __pending__ transactions handle and propagate transactions.
|
||||
//
|
||||
// We drain this to batch the transactions in a single message.
|
||||
//
|
||||
// We don't expect this buffer to be large, since only pending transactions are
|
||||
// emitted here.
|
||||
let mut new_txs = Vec::new();
|
||||
while let Poll::Ready(Some(hash)) =
|
||||
|
||||
Reference in New Issue
Block a user