mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Prioritisation network manager + transactions manager + eth request handler (#6590)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
This commit is contained in:
18
Cargo.lock
generated
18
Cargo.lock
generated
@ -2992,6 +2992,23 @@ version = "0.3.30"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
|
||||
|
||||
[[package]]
|
||||
name = "futures-test"
|
||||
version = "0.3.30"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ce388237b32ac42eca0df1ba55ed3bbda4eaf005d7d4b5dbc0b20ab962928ac9"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
"futures-io",
|
||||
"futures-macro",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
"pin-project",
|
||||
"pin-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-timer"
|
||||
version = "3.0.3"
|
||||
@ -6293,6 +6310,7 @@ dependencies = [
|
||||
"ethers-signers",
|
||||
"fnv",
|
||||
"futures",
|
||||
"futures-test",
|
||||
"humantime-serde",
|
||||
"itertools 0.12.1",
|
||||
"linked_hash_set",
|
||||
|
||||
@ -37,6 +37,7 @@ pin-project.workspace = true
|
||||
tokio = { workspace = true, features = ["io-util", "net", "macros", "rt-multi-thread", "time"] }
|
||||
tokio-stream.workspace = true
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
futures-test = "0.3.30"
|
||||
|
||||
# io
|
||||
serde = { workspace = true, optional = true }
|
||||
|
||||
78
crates/net/network/src/budget.rs
Normal file
78
crates/net/network/src/budget.rs
Normal file
@ -0,0 +1,78 @@
|
||||
/// Default budget to try and drain streams.
|
||||
///
|
||||
/// Default is 10 iterations.
|
||||
pub const DEFAULT_BUDGET_TRY_DRAIN_STREAM: u32 = 10;
|
||||
|
||||
/// Default budget to try and drain [`Swarm`](crate::swarm::Swarm).
|
||||
///
|
||||
/// Default is 10 [`SwarmEvent`](crate::swarm::SwarmEvent)s.
|
||||
pub const DEFAULT_BUDGET_TRY_DRAIN_SWARM: u32 = 10;
|
||||
|
||||
/// Default budget to try and drain pending messages from [`NetworkHandle`](crate::NetworkHandle)
|
||||
/// channel. Polling the [`TransactionsManager`](crate::transactions::TransactionsManager) future
|
||||
/// sends these types of messages.
|
||||
//
|
||||
// Default is 40 outgoing transaction messages.
|
||||
pub const DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL: u32 =
|
||||
4 * DEFAULT_BUDGET_TRY_DRAIN_STREAM;
|
||||
|
||||
/// Default budget to try and drain stream of
|
||||
/// [`NetworkTransactionEvent`](crate::transactions::NetworkTransactionEvent)s from
|
||||
/// [`NetworkManager`](crate::NetworkManager).
|
||||
///
|
||||
/// Default is 10 incoming transaction messages.
|
||||
pub const DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS: u32 = DEFAULT_BUDGET_TRY_DRAIN_SWARM;
|
||||
|
||||
/// Default budget to try and flush pending pool imports to pool. This number reflects the number
|
||||
/// of transactions that can be queued for import to pool in each iteration of the loop in the
|
||||
/// [`TransactionsManager`](crate::transactions::TransactionsManager) future.
|
||||
//
|
||||
// Default is 40 pending pool imports.
|
||||
pub const DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS: u32 = 4 * DEFAULT_BUDGET_TRY_DRAIN_STREAM;
|
||||
|
||||
/// Default budget to try and stream hashes of successfully imported transactions from the pool.
|
||||
///
|
||||
/// Default is naturally same as the number of transactions to attempt importing,
|
||||
/// [`DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS`], so 40 pool imports.
|
||||
pub const DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS: u32 =
|
||||
DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS;
|
||||
|
||||
/// Polls the given stream. Breaks with `true` if there maybe is more work.
|
||||
#[macro_export]
|
||||
macro_rules! poll_nested_stream_with_budget {
|
||||
($target:literal, $label:literal, $budget:ident, $poll_stream:expr, $on_ready_some:expr $(, $on_ready_none:expr;)? $(,)?) => {{
|
||||
let mut budget: u32 = $budget;
|
||||
|
||||
loop {
|
||||
match $poll_stream {
|
||||
Poll::Ready(Some(item)) => {
|
||||
let mut f = $on_ready_some;
|
||||
f(item);
|
||||
|
||||
budget = budget.saturating_sub(1);
|
||||
if budget == 0 {
|
||||
break true
|
||||
}
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
$($on_ready_none;)? // todo: handle error case with $target and $label
|
||||
break false
|
||||
}
|
||||
Poll::Pending => break false,
|
||||
}
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
/// Metered poll of the given stream. Breaks with `true` if there maybe is more work.
|
||||
#[macro_export]
|
||||
macro_rules! metered_poll_nested_stream_with_budget {
|
||||
($acc:ident, $target:literal, $label:literal, $budget:ident, $poll_stream:expr, $on_ready_some:expr $(, $on_ready_none:expr;)? $(,)?) => {{
|
||||
duration_metered_exec!(
|
||||
{
|
||||
$crate::poll_nested_stream_with_budget!($target, $label, $budget, $poll_stream, $on_ready_some $(, $on_ready_none;)?)
|
||||
},
|
||||
$acc
|
||||
)
|
||||
}};
|
||||
}
|
||||
@ -1,6 +1,9 @@
|
||||
//! Blocks/Headers management for the p2p network.
|
||||
|
||||
use crate::{metrics::EthRequestHandlerMetrics, peers::PeersHandle};
|
||||
use crate::{
|
||||
budget::DEFAULT_BUDGET_TRY_DRAIN_STREAM, metrics::EthRequestHandlerMetrics, peers::PeersHandle,
|
||||
poll_nested_stream_with_budget,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use reth_eth_wire::{
|
||||
BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData, GetReceipts, NodeData,
|
||||
@ -240,11 +243,13 @@ where
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
loop {
|
||||
match this.incoming_requests.poll_next_unpin(cx) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(None) => return Poll::Ready(()),
|
||||
Poll::Ready(Some(incoming)) => match incoming {
|
||||
let maybe_more_incoming_requests = poll_nested_stream_with_budget!(
|
||||
"net::eth",
|
||||
"Incoming eth requests stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
|
||||
this.incoming_requests.poll_next_unpin(cx),
|
||||
|incoming| {
|
||||
match incoming {
|
||||
IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
|
||||
this.on_headers_request(peer_id, request, response)
|
||||
}
|
||||
@ -255,9 +260,18 @@ where
|
||||
IncomingEthRequest::GetReceipts { peer_id, request, response } => {
|
||||
this.on_receipts_request(peer_id, request, response)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// stream is fully drained and import futures pending
|
||||
if maybe_more_incoming_requests {
|
||||
// make sure we're woken up again
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Pending
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -114,6 +114,7 @@
|
||||
/// Common helpers for network testing.
|
||||
pub mod test_utils;
|
||||
|
||||
mod budget;
|
||||
mod builder;
|
||||
mod cache;
|
||||
pub mod config;
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
//! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the RLPx session.
|
||||
|
||||
use crate::{
|
||||
budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
|
||||
config::NetworkConfig,
|
||||
discovery::Discovery,
|
||||
error::{NetworkError, ServiceKind},
|
||||
@ -26,6 +27,7 @@ use crate::{
|
||||
metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
|
||||
network::{NetworkHandle, NetworkHandleMessage},
|
||||
peers::{PeersHandle, PeersManager},
|
||||
poll_nested_stream_with_budget,
|
||||
protocol::IntoRlpxSubProtocol,
|
||||
session::SessionManager,
|
||||
state::NetworkState,
|
||||
@ -911,25 +913,7 @@ where
|
||||
this.on_block_import_result(outcome);
|
||||
}
|
||||
|
||||
// process incoming messages from a handle
|
||||
let start_network_handle = Instant::now();
|
||||
loop {
|
||||
match this.from_handle_rx.poll_next_unpin(cx) {
|
||||
Poll::Pending => break,
|
||||
Poll::Ready(None) => {
|
||||
// This is only possible if the channel was deliberately closed since we
|
||||
// always have an instance of
|
||||
// `NetworkHandle`
|
||||
error!("Network message channel closed.");
|
||||
return Poll::Ready(())
|
||||
}
|
||||
Poll::Ready(Some(msg)) => this.on_handle_message(msg),
|
||||
};
|
||||
}
|
||||
|
||||
poll_durations.acc_network_handle = start_network_handle.elapsed();
|
||||
|
||||
// This loop drives the entire state of network and does a lot of work. Under heavy load
|
||||
// These loops drive the entire state of network and does a lot of work. Under heavy load
|
||||
// (many messages/events), data may arrive faster than it can be processed (incoming
|
||||
// messages/requests -> events), and it is possible that more data has already arrived by
|
||||
// the time an internal event is processed. Which could turn this loop into a busy loop.
|
||||
@ -947,28 +931,40 @@ where
|
||||
// iterations in << 100µs in most cases. On average it requires ~50µs, which is inside the
|
||||
// range of what's recommended as rule of thumb.
|
||||
// <https://ryhl.io/blog/async-what-is-blocking/>
|
||||
let mut budget = 10;
|
||||
|
||||
loop {
|
||||
// advance the swarm
|
||||
match this.swarm.poll_next_unpin(cx) {
|
||||
Poll::Pending | Poll::Ready(None) => break,
|
||||
Poll::Ready(Some(event)) => this.on_swarm_event(event),
|
||||
}
|
||||
|
||||
// ensure we still have enough budget for another iteration
|
||||
budget -= 1;
|
||||
if budget == 0 {
|
||||
trace!(target: "net", budget=10, "exhausted network manager budget");
|
||||
// make sure we're woken up again
|
||||
cx.waker().wake_by_ref();
|
||||
break
|
||||
}
|
||||
}
|
||||
// process incoming messages from a handle (`TransactionsManager` has one)
|
||||
//
|
||||
// will only be closed if the channel was deliberately closed since we always have an
|
||||
// instance of `NetworkHandle`
|
||||
let start_network_handle = Instant::now();
|
||||
let maybe_more_handle_messages = poll_nested_stream_with_budget!(
|
||||
"net",
|
||||
"Network message channel",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
|
||||
this.from_handle_rx.poll_next_unpin(cx),
|
||||
|msg| this.on_handle_message(msg),
|
||||
error!("Network channel closed");
|
||||
);
|
||||
poll_durations.acc_network_handle = start_network_handle.elapsed();
|
||||
|
||||
// process incoming messages from the network
|
||||
let maybe_more_swarm_events = poll_nested_stream_with_budget!(
|
||||
"net",
|
||||
"Swarm events stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_SWARM,
|
||||
this.swarm.poll_next_unpin(cx),
|
||||
|event| this.on_swarm_event(event),
|
||||
);
|
||||
poll_durations.acc_swarm =
|
||||
start_network_handle.elapsed() - poll_durations.acc_network_handle;
|
||||
|
||||
// all streams are fully drained and import futures pending
|
||||
if maybe_more_handle_messages || maybe_more_swarm_events {
|
||||
// make sure we're woken up again
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Pending
|
||||
}
|
||||
|
||||
this.update_poll_metrics(start, poll_durations);
|
||||
|
||||
Poll::Pending
|
||||
|
||||
@ -1,10 +1,16 @@
|
||||
//! Transactions management for the p2p network.
|
||||
|
||||
use crate::{
|
||||
budget::{
|
||||
DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
|
||||
DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
|
||||
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
|
||||
},
|
||||
cache::LruCache,
|
||||
duration_metered_exec,
|
||||
manager::NetworkEvent,
|
||||
message::{PeerRequest, PeerRequestSender},
|
||||
metered_poll_nested_stream_with_budget,
|
||||
metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
|
||||
NetworkEvents, NetworkHandle,
|
||||
};
|
||||
@ -1065,6 +1071,58 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes a batch import results.
|
||||
fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<TxHash>>) {
|
||||
for res in batch_results {
|
||||
match res {
|
||||
Ok(hash) => {
|
||||
self.on_good_import(hash);
|
||||
}
|
||||
Err(err) => {
|
||||
// if we're _currently_ syncing and the transaction is bad we
|
||||
// ignore it, otherwise we penalize the peer that sent the bad
|
||||
// transaction with the assumption that the peer should have
|
||||
// known that this transaction is bad. (e.g. consensus
|
||||
// rules)
|
||||
if err.is_bad_transaction() && !self.network.is_syncing() {
|
||||
debug!(target: "net::tx", %err, "bad pool transaction import");
|
||||
self.on_bad_import(err.hash);
|
||||
continue
|
||||
}
|
||||
self.on_good_import(err.hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes a [`FetchEvent`].
|
||||
fn on_fetch_event(&mut self, fetch_event: FetchEvent) {
|
||||
match fetch_event {
|
||||
FetchEvent::TransactionsFetched { peer_id, transactions } => {
|
||||
self.import_transactions(peer_id, transactions, TransactionSource::Response);
|
||||
}
|
||||
FetchEvent::FetchError { peer_id, error } => {
|
||||
trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
|
||||
self.on_request_error(peer_id, error);
|
||||
}
|
||||
FetchEvent::EmptyResponse { peer_id } => {
|
||||
trace!(target: "net::tx", ?peer_id, "peer returned empty response");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs an operation to fetch hashes that are cached in in [`TransactionFetcher`].
|
||||
fn on_fetch_hashes_pending_fetch(&mut self) {
|
||||
// try drain transaction hashes pending fetch
|
||||
let info = &self.pending_pool_imports_info;
|
||||
let max_pending_pool_imports = info.max_pending_pool_imports;
|
||||
let has_capacity_wrt_pending_pool_imports =
|
||||
|divisor| info.has_capacity(max_pending_pool_imports / divisor);
|
||||
|
||||
self.transaction_fetcher
|
||||
.on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
|
||||
}
|
||||
|
||||
fn report_peer_bad_transactions(&self, peer_id: PeerId) {
|
||||
self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
|
||||
self.metrics.reported_bad_transactions.increment(1);
|
||||
@ -1137,207 +1195,149 @@ where
|
||||
|
||||
let this = self.get_mut();
|
||||
|
||||
// If the budget is exhausted we manually yield back control to tokio. See
|
||||
// `NetworkManager` for more context on the design pattern.
|
||||
let mut budget = 1024;
|
||||
// All streams are polled until their corresponding budget is exhausted, then we manually
|
||||
// yield back control to tokio. See `NetworkManager` for more context on the design
|
||||
// pattern.
|
||||
|
||||
loop {
|
||||
let mut some_ready = false;
|
||||
// Advance pool imports (flush txns to pool).
|
||||
//
|
||||
// Note, this is done in batches. A batch is filled from one `Transactions`
|
||||
// broadcast messages or one `PooledTransactions` response at a time. The
|
||||
// minimum batch size is 1 transaction (and might often be the case with blob
|
||||
// transactions).
|
||||
//
|
||||
// The smallest decodable transaction is an empty legacy transaction, 10 bytes
|
||||
// (2 MiB / 10 bytes > 200k transactions).
|
||||
//
|
||||
// Since transactions aren't validated until they are inserted into the pool,
|
||||
// this can potentially validate >200k transactions. More if the message size
|
||||
// is bigger than the soft limit on a `PooledTransactions` response which is
|
||||
// 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
|
||||
let acc = &mut poll_durations.acc_pending_imports;
|
||||
let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
"net::tx",
|
||||
"Batched pool imports stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
|
||||
this.pool_imports.poll_next_unpin(cx),
|
||||
|batch_results| this.on_batch_import_result(batch_results)
|
||||
);
|
||||
|
||||
let acc = &mut poll_durations.acc_network_events;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
// Advance network/peer related events (update peers map).
|
||||
if let Poll::Ready(Some(event)) = this.network_events.poll_next_unpin(cx) {
|
||||
this.on_network_event(event);
|
||||
some_ready = true;
|
||||
}
|
||||
},
|
||||
acc
|
||||
);
|
||||
// Advance network/peer related events (update peers map).
|
||||
let acc = &mut poll_durations.acc_network_events;
|
||||
let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
"net::tx",
|
||||
"Network events stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
|
||||
this.network_events.poll_next_unpin(cx),
|
||||
|event| this.on_network_event(event)
|
||||
);
|
||||
|
||||
let acc = &mut poll_durations.acc_pending_fetch;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
// Tries to drain hashes pending fetch cache if the tx manager currently has
|
||||
// capacity for this (fetch txns).
|
||||
//
|
||||
// Sends at most one request.
|
||||
if this.has_capacity_for_fetching_pending_hashes() {
|
||||
// try drain transaction hashes pending fetch
|
||||
let info = &this.pending_pool_imports_info;
|
||||
let max_pending_pool_imports = info.max_pending_pool_imports;
|
||||
let has_capacity_wrt_pending_pool_imports =
|
||||
|divisor| info.has_capacity(max_pending_pool_imports / divisor);
|
||||
// Advances new __pending__ transactions, transactions that were successfully inserted into
|
||||
// pending set in pool (are valid), and propagates them (inform peers which
|
||||
// transactions we have seen).
|
||||
//
|
||||
// We try to drain this to batch the transactions in a single message.
|
||||
//
|
||||
// We don't expect this buffer to be large, since only pending transactions are
|
||||
// emitted here.
|
||||
let mut new_txs = Vec::new();
|
||||
let acc = &mut poll_durations.acc_imported_txns;
|
||||
let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
"net::tx",
|
||||
"Pending transactions stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
|
||||
this.pending_transactions.poll_next_unpin(cx),
|
||||
|hash| new_txs.push(hash)
|
||||
);
|
||||
if !new_txs.is_empty() {
|
||||
this.on_new_pending_transactions(new_txs);
|
||||
}
|
||||
|
||||
this.transaction_fetcher.on_fetch_pending_hashes(
|
||||
&this.peers,
|
||||
has_capacity_wrt_pending_pool_imports,
|
||||
);
|
||||
}
|
||||
},
|
||||
acc
|
||||
);
|
||||
// Advance inflight fetch requests (flush transaction fetcher and queue for
|
||||
// import to pool).
|
||||
//
|
||||
// The smallest decodable transaction is an empty legacy transaction, 10 bytes
|
||||
// (2 MiB / 10 bytes > 200k transactions).
|
||||
//
|
||||
// Since transactions aren't validated until they are inserted into the pool,
|
||||
// this can potentially queue >200k transactions for insertion to pool. More
|
||||
// if the message size is bigger than the soft limit on a `PooledTransactions`
|
||||
// response which is 2 MiB.
|
||||
let acc = &mut poll_durations.acc_fetch_events;
|
||||
let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
"net::tx",
|
||||
"Transaction fetch events stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
|
||||
this.transaction_fetcher.poll_next_unpin(cx),
|
||||
|event| this.on_fetch_event(event),
|
||||
);
|
||||
|
||||
let acc = &mut poll_durations.acc_cmds;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
// Advance commands (propagate/fetch/serve txns).
|
||||
if let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
|
||||
this.on_command(cmd);
|
||||
some_ready = true;
|
||||
}
|
||||
},
|
||||
acc
|
||||
);
|
||||
// Advance incoming transaction events (stream new txns/announcements from
|
||||
// network manager and queue for import to pool/fetch txns).
|
||||
//
|
||||
// This will potentially remove hashes from hashes pending fetch, it the event
|
||||
// is an announcement (if same hashes are announced that didn't fit into a
|
||||
// previous request).
|
||||
//
|
||||
// The smallest decodable transaction is an empty legacy transaction, 10 bytes
|
||||
// (128 KiB / 10 bytes > 13k transactions).
|
||||
//
|
||||
// If this is an event with `Transactions` message, since transactions aren't
|
||||
// validated until they are inserted into the pool, this can potentially queue
|
||||
// >13k transactions for insertion to pool. More if the message size is bigger
|
||||
// than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
|
||||
let acc = &mut poll_durations.acc_tx_events;
|
||||
let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
"net::tx",
|
||||
"Network transaction events stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
|
||||
this.transaction_events.poll_next_unpin(cx),
|
||||
|event| this.on_network_tx_event(event),
|
||||
);
|
||||
|
||||
let acc = &mut poll_durations.acc_tx_events;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
// Advance incoming transaction events (stream new txns/announcements from
|
||||
// network manager and queue for import to pool/fetch txns).
|
||||
//
|
||||
// This will potentially remove hashes from hashes pending fetch, it the event
|
||||
// is an announcement (if same hashes are announced that didn't fit into a
|
||||
// previous request).
|
||||
//
|
||||
// The smallest decodable transaction is an empty legacy transaction, 10 bytes
|
||||
// (128 KiB / 10 bytes > 13k transactions).
|
||||
//
|
||||
// If this is an event with `Transactions` message, since transactions aren't
|
||||
// validated until they are inserted into the pool, this can potentially queue
|
||||
// >13k transactions for insertion to pool. More if the message size is bigger
|
||||
// than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
|
||||
if let Poll::Ready(Some(event)) = this.transaction_events.poll_next_unpin(cx) {
|
||||
this.on_network_tx_event(event);
|
||||
some_ready = true;
|
||||
}
|
||||
},
|
||||
acc
|
||||
);
|
||||
// Tries to drain hashes pending fetch cache if the tx manager currently has
|
||||
// capacity for this (fetch txns).
|
||||
//
|
||||
// Sends at most one request.
|
||||
let acc = &mut poll_durations.acc_pending_fetch;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
if this.has_capacity_for_fetching_pending_hashes() {
|
||||
this.on_fetch_hashes_pending_fetch();
|
||||
}
|
||||
},
|
||||
acc
|
||||
);
|
||||
|
||||
let acc = &mut poll_durations.acc_fetch_events;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
this.transaction_fetcher.update_metrics();
|
||||
// Advance commands (propagate/fetch/serve txns).
|
||||
let acc = &mut poll_durations.acc_cmds;
|
||||
let maybe_more_commands = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
"net::tx",
|
||||
"Commands channel",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
|
||||
this.command_rx.poll_next_unpin(cx),
|
||||
|cmd| this.on_command(cmd)
|
||||
);
|
||||
|
||||
// Advance fetching transaction events (flush transaction fetcher and queue for
|
||||
// import to pool).
|
||||
//
|
||||
// The smallest decodable transaction is an empty legacy transaction, 10 bytes
|
||||
// (2 MiB / 10 bytes > 200k transactions).
|
||||
//
|
||||
// Since transactions aren't validated until they are inserted into the pool,
|
||||
// this can potentially queue >200k transactions for insertion to pool. More
|
||||
// if the message size is bigger than the soft limit on a `PooledTransactions`
|
||||
// response which is 2 MiB.
|
||||
if let Poll::Ready(Some(fetch_event)) =
|
||||
this.transaction_fetcher.poll_next_unpin(cx)
|
||||
{
|
||||
match fetch_event {
|
||||
FetchEvent::TransactionsFetched { peer_id, transactions } => {
|
||||
this.import_transactions(
|
||||
peer_id,
|
||||
transactions,
|
||||
TransactionSource::Response,
|
||||
);
|
||||
}
|
||||
FetchEvent::FetchError { peer_id, error } => {
|
||||
trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
|
||||
this.on_request_error(peer_id, error);
|
||||
}
|
||||
FetchEvent::EmptyResponse { peer_id } => {
|
||||
trace!(target: "net::tx", ?peer_id, "peer returned empty response");
|
||||
}
|
||||
}
|
||||
some_ready = true;
|
||||
}
|
||||
this.transaction_fetcher.update_metrics();
|
||||
|
||||
this.transaction_fetcher.update_metrics();
|
||||
},
|
||||
acc
|
||||
);
|
||||
|
||||
let acc = &mut poll_durations.acc_pending_imports;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
// Advance pool imports (flush txns to pool).
|
||||
//
|
||||
// Note, this is done in batches. A batch is filled from one `Transactions`
|
||||
// broadcast messages or one `PooledTransactions` response at a time. The
|
||||
// minimum batch size is 1 transaction (and might often be the case with blob
|
||||
// transactions).
|
||||
//
|
||||
// The smallest decodable transaction is an empty legacy transaction, 10 bytes
|
||||
// (2 MiB / 10 bytes > 200k transactions).
|
||||
//
|
||||
// Since transactions aren't validated until they are inserted into the pool,
|
||||
// this can potentially validate >200k transactions. More if the message size
|
||||
// is bigger than the soft limit on a `PooledTransactions` response which is
|
||||
// 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
|
||||
if let Poll::Ready(Some(batch_import_res)) =
|
||||
this.pool_imports.poll_next_unpin(cx)
|
||||
{
|
||||
for res in batch_import_res {
|
||||
match res {
|
||||
Ok(hash) => {
|
||||
this.on_good_import(hash);
|
||||
}
|
||||
Err(err) => {
|
||||
// if we're _currently_ syncing and the transaction is bad we
|
||||
// ignore it, otherwise we penalize the peer that sent the bad
|
||||
// transaction with the assumption that the peer should have
|
||||
// known that this transaction is bad. (e.g. consensus
|
||||
// rules)
|
||||
if err.is_bad_transaction() && !this.network.is_syncing() {
|
||||
debug!(target: "net::tx", %err, "bad pool transaction import");
|
||||
this.on_bad_import(err.hash);
|
||||
continue
|
||||
}
|
||||
this.on_good_import(err.hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
some_ready = true;
|
||||
}
|
||||
},
|
||||
acc
|
||||
);
|
||||
|
||||
let acc = &mut poll_durations.acc_imported_txns;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
// Drain new __pending__ transactions handle and propagate transactions.
|
||||
//
|
||||
// We drain this to batch the transactions in a single message.
|
||||
//
|
||||
// We don't expect this buffer to be large, since only pending transactions are
|
||||
// emitted here.
|
||||
let mut new_txs = Vec::new();
|
||||
while let Poll::Ready(Some(hash)) =
|
||||
this.pending_transactions.poll_next_unpin(cx)
|
||||
{
|
||||
new_txs.push(hash);
|
||||
}
|
||||
if !new_txs.is_empty() {
|
||||
this.on_new_pending_transactions(new_txs);
|
||||
}
|
||||
},
|
||||
acc
|
||||
);
|
||||
|
||||
// all channels are fully drained and import futures pending
|
||||
if !some_ready {
|
||||
break
|
||||
}
|
||||
|
||||
budget -= 1;
|
||||
if budget <= 0 {
|
||||
// Make sure we're woken up again
|
||||
cx.waker().wake_by_ref();
|
||||
break
|
||||
}
|
||||
// all channels are fully drained and import futures pending
|
||||
if maybe_more_network_events ||
|
||||
maybe_more_commands ||
|
||||
maybe_more_tx_events ||
|
||||
maybe_more_tx_fetch_events ||
|
||||
maybe_more_pool_imports ||
|
||||
maybe_more_pending_txns
|
||||
{
|
||||
// make sure we're woken up again
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Pending
|
||||
}
|
||||
|
||||
this.update_poll_metrics(start, poll_durations);
|
||||
|
||||
@ -98,6 +98,7 @@ where
|
||||
}
|
||||
|
||||
/// Endless future that [Self::clear_stale_filters] every `stale_filter_ttl` interval.
|
||||
/// Nonetheless, this endless future frees the thread at every await point.
|
||||
async fn watch_and_clear_stale_filters(&self) {
|
||||
let mut interval = tokio::time::interval(self.inner.stale_filter_ttl);
|
||||
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
|
||||
Reference in New Issue
Block a user