mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
perf(net): decrease budget EthRequestHandler + metrics (#8497)
This commit is contained in:
@ -3,6 +3,11 @@
|
||||
/// Default is 10 iterations.
|
||||
pub const DEFAULT_BUDGET_TRY_DRAIN_STREAM: u32 = 10;
|
||||
|
||||
/// Default budget to try and drain headers and bodies download streams.
|
||||
///
|
||||
/// Default is 4 iterations.
|
||||
pub const DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS: u32 = 4;
|
||||
|
||||
/// Default budget to try and drain [`Swarm`](crate::swarm::Swarm).
|
||||
///
|
||||
/// Default is 10 [`SwarmEvent`](crate::swarm::SwarmEvent)s.
|
||||
@ -68,8 +73,8 @@ macro_rules! poll_nested_stream_with_budget {
|
||||
/// Metered poll of the given stream. Breaks with `true` if there maybe is more work.
|
||||
#[macro_export]
|
||||
macro_rules! metered_poll_nested_stream_with_budget {
|
||||
($acc:ident, $target:literal, $label:literal, $budget:ident, $poll_stream:expr, $on_ready_some:expr $(, $on_ready_none:expr;)? $(,)?) => {{
|
||||
duration_metered_exec!(
|
||||
($acc:expr, $target:literal, $label:literal, $budget:ident, $poll_stream:expr, $on_ready_some:expr $(, $on_ready_none:expr;)? $(,)?) => {{
|
||||
$crate::duration_metered_exec!(
|
||||
{
|
||||
$crate::poll_nested_stream_with_budget!($target, $label, $budget, $poll_stream, $on_ready_some $(, $on_ready_none;)?)
|
||||
},
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
//! Blocks/Headers management for the p2p network.
|
||||
|
||||
use crate::{
|
||||
budget::DEFAULT_BUDGET_TRY_DRAIN_STREAM, metrics::EthRequestHandlerMetrics, peers::PeersHandle,
|
||||
poll_nested_stream_with_budget,
|
||||
budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
|
||||
metrics::EthRequestHandlerMetrics, peers::PeersHandle,
|
||||
};
|
||||
use alloy_rlp::Encodable;
|
||||
use futures::StreamExt;
|
||||
@ -18,6 +18,7 @@ use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::sync::{mpsc::Receiver, oneshot};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
@ -239,10 +240,12 @@ where
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
let maybe_more_incoming_requests = poll_nested_stream_with_budget!(
|
||||
let mut acc = Duration::ZERO;
|
||||
let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
"net::eth",
|
||||
"Incoming eth requests stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
|
||||
DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
|
||||
this.incoming_requests.poll_next_unpin(cx),
|
||||
|incoming| {
|
||||
match incoming {
|
||||
@ -262,6 +265,8 @@ where
|
||||
},
|
||||
);
|
||||
|
||||
this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
|
||||
|
||||
// stream is fully drained and import futures pending
|
||||
if maybe_more_incoming_requests {
|
||||
// make sure we're woken up again
|
||||
|
||||
@ -161,8 +161,8 @@ impl<C> NetworkManager<C> {
|
||||
// update metrics for whole poll function
|
||||
metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
|
||||
// update poll metrics for nested items
|
||||
metrics.duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
|
||||
metrics.duration_poll_swarm.set(acc_swarm.as_secs_f64());
|
||||
metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
|
||||
metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -69,12 +69,12 @@ pub struct NetworkMetrics {
|
||||
/// [`TransactionsManager`](crate::transactions::TransactionsManager) holds this handle.
|
||||
///
|
||||
/// Duration in seconds.
|
||||
pub(crate) duration_poll_network_handle: Gauge,
|
||||
pub(crate) acc_duration_poll_network_handle: Gauge,
|
||||
/// Time spent polling [`Swarm`](crate::swarm::Swarm), in one call to poll the
|
||||
/// [`NetworkManager`](crate::NetworkManager) future.
|
||||
///
|
||||
/// Duration in seconds.
|
||||
pub(crate) duration_poll_swarm: Gauge,
|
||||
pub(crate) acc_duration_poll_swarm: Gauge,
|
||||
}
|
||||
|
||||
/// Metrics for SessionManager
|
||||
@ -226,12 +226,12 @@ pub struct TransactionFetcherMetrics {
|
||||
/// accumulator value passed as a mutable reference.
|
||||
#[macro_export]
|
||||
macro_rules! duration_metered_exec {
|
||||
($code:expr, $acc:ident) => {{
|
||||
let start = Instant::now();
|
||||
($code:expr, $acc:expr) => {{
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
let res = $code;
|
||||
|
||||
*$acc += start.elapsed();
|
||||
$acc += start.elapsed();
|
||||
|
||||
res
|
||||
}};
|
||||
@ -321,6 +321,10 @@ pub struct EthRequestHandlerMetrics {
|
||||
|
||||
/// Number of GetNodeData requests received
|
||||
pub(crate) eth_node_data_requests_received_total: Counter,
|
||||
|
||||
/// Duration in seconds of call to poll
|
||||
/// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
|
||||
pub(crate) acc_duration_poll_eth_req_handler: Gauge,
|
||||
}
|
||||
|
||||
/// Eth67 announcement metrics, track entries by TxType
|
||||
|
||||
@ -50,7 +50,7 @@ use std::{
|
||||
collections::HashMap,
|
||||
pin::Pin,
|
||||
task::{ready, Context, Poll},
|
||||
time::{Duration, Instant},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
|
||||
use tracing::{debug, trace};
|
||||
@ -430,7 +430,6 @@ impl TransactionFetcher {
|
||||
let budget_find_idle_fallback_peer = self
|
||||
.search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
|
||||
|
||||
let acc = &mut search_durations.find_idle_peer;
|
||||
let peer_id = duration_metered_exec!(
|
||||
{
|
||||
let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
|
||||
@ -444,7 +443,7 @@ impl TransactionFetcher {
|
||||
|
||||
peer_id
|
||||
},
|
||||
acc
|
||||
search_durations.find_idle_peer
|
||||
);
|
||||
|
||||
// peer should always exist since `is_session_active` already checked
|
||||
@ -460,7 +459,6 @@ impl TransactionFetcher {
|
||||
&has_capacity_wrt_pending_pool_imports,
|
||||
);
|
||||
|
||||
let acc = &mut search_durations.fill_request;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
self.fill_request_from_hashes_pending_fetch(
|
||||
@ -469,7 +467,7 @@ impl TransactionFetcher {
|
||||
budget_fill_request,
|
||||
)
|
||||
},
|
||||
acc
|
||||
search_durations.fill_request
|
||||
);
|
||||
|
||||
// free unused memory
|
||||
|
||||
@ -1231,9 +1231,8 @@ where
|
||||
// this can potentially validate >200k transactions. More if the message size
|
||||
// is bigger than the soft limit on a `PooledTransactions` response which is
|
||||
// 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
|
||||
let acc = &mut poll_durations.acc_pending_imports;
|
||||
let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
poll_durations.acc_pending_imports,
|
||||
"net::tx",
|
||||
"Batched pool imports stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
|
||||
@ -1242,9 +1241,8 @@ where
|
||||
);
|
||||
|
||||
// Advance network/peer related events (update peers map).
|
||||
let acc = &mut poll_durations.acc_network_events;
|
||||
let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
poll_durations.acc_network_events,
|
||||
"net::tx",
|
||||
"Network events stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
|
||||
@ -1261,9 +1259,8 @@ where
|
||||
// We don't expect this buffer to be large, since only pending transactions are
|
||||
// emitted here.
|
||||
let mut new_txs = Vec::new();
|
||||
let acc = &mut poll_durations.acc_imported_txns;
|
||||
let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
poll_durations.acc_imported_txns,
|
||||
"net::tx",
|
||||
"Pending transactions stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
|
||||
@ -1284,9 +1281,8 @@ where
|
||||
// this can potentially queue >200k transactions for insertion to pool. More
|
||||
// if the message size is bigger than the soft limit on a `PooledTransactions`
|
||||
// response which is 2 MiB.
|
||||
let acc = &mut poll_durations.acc_fetch_events;
|
||||
let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
poll_durations.acc_fetch_events,
|
||||
"net::tx",
|
||||
"Transaction fetch events stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
|
||||
@ -1308,9 +1304,8 @@ where
|
||||
// validated until they are inserted into the pool, this can potentially queue
|
||||
// >13k transactions for insertion to pool. More if the message size is bigger
|
||||
// than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
|
||||
let acc = &mut poll_durations.acc_tx_events;
|
||||
let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
poll_durations.acc_tx_events,
|
||||
"net::tx",
|
||||
"Network transaction events stream",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
|
||||
@ -1322,20 +1317,18 @@ where
|
||||
// capacity for this (fetch txns).
|
||||
//
|
||||
// Sends at most one request.
|
||||
let acc = &mut poll_durations.acc_pending_fetch;
|
||||
duration_metered_exec!(
|
||||
{
|
||||
if this.has_capacity_for_fetching_pending_hashes() {
|
||||
this.on_fetch_hashes_pending_fetch();
|
||||
}
|
||||
},
|
||||
acc
|
||||
poll_durations.acc_pending_fetch
|
||||
);
|
||||
|
||||
// Advance commands (propagate/fetch/serve txns).
|
||||
let acc = &mut poll_durations.acc_cmds;
|
||||
let maybe_more_commands = metered_poll_nested_stream_with_budget!(
|
||||
acc,
|
||||
poll_durations.acc_cmds,
|
||||
"net::tx",
|
||||
"Commands channel",
|
||||
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
|
||||
|
||||
@ -2521,7 +2521,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "builder",
|
||||
"expr": "reth_network_duration_poll_network_handle{instance=\"$instance\"}",
|
||||
"expr": "reth_network_acc_duration_poll_network_handle{instance=\"$instance\"}",
|
||||
"fullMetaSearch": false,
|
||||
"hide": false,
|
||||
"includeNullMetadata": true,
|
||||
@ -2538,7 +2538,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "builder",
|
||||
"expr": "reth_network_duration_poll_swarm{instance=\"$instance\"}",
|
||||
"expr": "reth_network_acc_duration_poll_swarm{instance=\"$instance\"}",
|
||||
"fullMetaSearch": false,
|
||||
"hide": false,
|
||||
"includeNullMetadata": true,
|
||||
|
||||
Reference in New Issue
Block a user