mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 19:09:54 +00:00
fix(net): add concurrency param from config to TransactionFetcherInfo (#11600)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
@ -130,20 +130,27 @@ impl TransactionFetcher {
|
||||
|
||||
/// Sets up transaction fetcher with config
|
||||
pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
|
||||
let mut tx_fetcher = Self::default();
|
||||
let TransactionFetcherConfig {
|
||||
max_inflight_requests,
|
||||
max_capacity_cache_txns_pending_fetch,
|
||||
..
|
||||
} = *config;
|
||||
|
||||
tx_fetcher.info.soft_limit_byte_size_pooled_transactions_response =
|
||||
config.soft_limit_byte_size_pooled_transactions_response;
|
||||
tx_fetcher.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request =
|
||||
config.soft_limit_byte_size_pooled_transactions_response_on_pack_request;
|
||||
tx_fetcher
|
||||
.metrics
|
||||
.capacity_inflight_requests
|
||||
.increment(tx_fetcher.info.max_inflight_requests as u64);
|
||||
tx_fetcher.info.max_capacity_cache_txns_pending_fetch =
|
||||
config.max_capacity_cache_txns_pending_fetch;
|
||||
let info = config.clone().into();
|
||||
|
||||
tx_fetcher
|
||||
let metrics = TransactionFetcherMetrics::default();
|
||||
metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
|
||||
|
||||
Self {
|
||||
active_peers: LruMap::new(max_inflight_requests),
|
||||
hashes_pending_fetch: LruCache::new(max_capacity_cache_txns_pending_fetch),
|
||||
hashes_fetch_inflight_and_pending_fetch: LruMap::new(
|
||||
max_inflight_requests + max_capacity_cache_txns_pending_fetch,
|
||||
),
|
||||
info,
|
||||
metrics,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes the specified hashes from inflight tracking.
|
||||
@ -178,7 +185,7 @@ impl TransactionFetcher {
|
||||
/// Returns `true` if peer is idle with respect to `self.inflight_requests`.
|
||||
pub fn is_idle(&self, peer_id: &PeerId) -> bool {
|
||||
let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
|
||||
if *inflight_count < DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER {
|
||||
if *inflight_count < self.info.max_inflight_requests_per_peer {
|
||||
return true
|
||||
}
|
||||
false
|
||||
@ -653,12 +660,12 @@ impl TransactionFetcher {
|
||||
return Some(new_announced_hashes)
|
||||
};
|
||||
|
||||
if *inflight_count >= DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER {
|
||||
if *inflight_count >= self.info.max_inflight_requests_per_peer {
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hashes=?*new_announced_hashes,
|
||||
%conn_eth_version,
|
||||
max_concurrent_tx_reqs_per_peer=DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
|
||||
max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
|
||||
"limit for concurrent `GetPooledTransactions` requests per peer reached"
|
||||
);
|
||||
return Some(new_announced_hashes)
|
||||
@ -1288,10 +1295,12 @@ pub enum VerificationOutcome {
|
||||
}
|
||||
|
||||
/// Tracks stats about the [`TransactionFetcher`].
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Constructor)]
|
||||
pub struct TransactionFetcherInfo {
|
||||
/// Max inflight [`GetPooledTransactions`] requests.
|
||||
pub max_inflight_requests: usize,
|
||||
/// Max inflight [`GetPooledTransactions`] requests per peer.
|
||||
pub max_inflight_requests_per_peer: u8,
|
||||
/// Soft limit for the byte size of the expected [`PooledTransactions`] response, upon packing
|
||||
/// a [`GetPooledTransactions`] request with hashes (by default less than 2 MiB worth of
|
||||
/// transactions is requested).
|
||||
@ -1305,27 +1314,11 @@ pub struct TransactionFetcherInfo {
|
||||
pub max_capacity_cache_txns_pending_fetch: u32,
|
||||
}
|
||||
|
||||
impl TransactionFetcherInfo {
|
||||
/// Creates a new max
|
||||
pub const fn new(
|
||||
max_inflight_requests: usize,
|
||||
soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
|
||||
soft_limit_byte_size_pooled_transactions_response: usize,
|
||||
max_capacity_cache_txns_pending_fetch: u32,
|
||||
) -> Self {
|
||||
Self {
|
||||
max_inflight_requests,
|
||||
soft_limit_byte_size_pooled_transactions_response_on_pack_request,
|
||||
soft_limit_byte_size_pooled_transactions_response,
|
||||
max_capacity_cache_txns_pending_fetch,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TransactionFetcherInfo {
|
||||
fn default() -> Self {
|
||||
Self::new(
|
||||
DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
|
||||
DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
|
||||
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
|
||||
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
|
||||
DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
|
||||
@ -1333,6 +1326,26 @@ impl Default for TransactionFetcherInfo {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
|
||||
fn from(config: TransactionFetcherConfig) -> Self {
|
||||
let TransactionFetcherConfig {
|
||||
max_inflight_requests,
|
||||
max_inflight_requests_per_peer,
|
||||
soft_limit_byte_size_pooled_transactions_response,
|
||||
soft_limit_byte_size_pooled_transactions_response_on_pack_request,
|
||||
max_capacity_cache_txns_pending_fetch,
|
||||
} = config;
|
||||
|
||||
Self::new(
|
||||
max_inflight_requests as usize,
|
||||
max_inflight_requests_per_peer,
|
||||
soft_limit_byte_size_pooled_transactions_response_on_pack_request,
|
||||
soft_limit_byte_size_pooled_transactions_response,
|
||||
max_capacity_cache_txns_pending_fetch,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct TxFetcherSearchDurations {
|
||||
find_idle_peer: Duration,
|
||||
|
||||
Reference in New Issue
Block a user